Spring Cloud DataFlow - 如何在 TCP 源中使用自定义 TCP encoder/decoder
Spring Cloud DataFlow - How to use a custom TCP encoder/decoder in TCP source
我已经向 索要 Spring XD。我现在正在尝试迁移到 Spring CDF。
我找到了 this link 并尝试重用那里的代码并用我的代码更改编码。
我创建了以下 POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>tcp-ber-source</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>TCP Ber Source</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<tcp-app-starters-common.version>1.1.0.RELEASE</tcp-app-starters-common.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>tcp-app-starters-common</artifactId>
<version>${tcp-app-starters-common.version}</version>
</dependency>
<dependency>
<groupId>com.example</groupId>
<artifactId>ber-byte-array-serializers</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置:
@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpBerSourceProperties.class)
public class TcpBerSourceConfiguration {
private final Source channels;
private final TcpBerSourceProperties properties;
@Autowired
public TcpSourceConfiguration(final TcpBerSourceProperties properties, final Source channels) {
this.properties = properties;
this.channels = channels;
}
@Bean
public TcpReceivingChannelAdapter adapter(@Qualifier("tcpBerSourceConnectionFactory") final AbstractConnectionFactory connectionFactory) {
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannel(this.channels.output());
return adapter;
}
@Bean
public TcpConnectionFactoryFactoryBean tcpBerSourceConnectionFactory(@Qualifier("tcpBerSourceDecoder") final AbstractByteArraySerializer decoder) throws Exception {
final TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
factoryBean.setType("server");
factoryBean.setPort(this.properties.getPort());
factoryBean.setUsingNio(this.properties.isNio());
factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
factoryBean.setLookupHost(this.properties.isReverseLookup());
factoryBean.setDeserializer(decoder);
factoryBean.setSoTimeout(this.properties.getSocketTimeout());
return factoryBean;
}
@Bean
public BerEncoderDecoderFactoryBean tcpBerSourceDecoder() {
final BerEncoderDecoderFactoryBean factoryBean = new BerEncoderDecoderFactoryBean(this.properties.getDecoder());
factoryBean.setMaxMessageSize(this.properties.getBufferSize());
return factoryBean;
}
}
还有这个 FactoryBean:
public class BerEncoderDecoderFactoryBean extends AbstractFactoryBean<AbstractByteArraySerializer> implements ApplicationEventPublisherAware {
private final BerEncoding encoding;
private ApplicationEventPublisher applicationEventPublisher;
private Integer maxMessageSize;
public BerEncoderDecoderFactoryBean(final BerEncoding encoding) {
Assert.notNull(encoding, "'encoding' cannot be null");
this.encoding = encoding;
}
@Override
public void setApplicationEventPublisher(final ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
/**
* The maximum message size allowed when decoding.
* @param maxMessageSize the maximum message size.
*/
public void setMaxMessageSize(final int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
@Override
protected AbstractByteArraySerializer createInstance() throws Exception {
final AbstractByteArraySerializer codec;
switch (this.encoding) {
case SPLIT:
codec = new ByteArrayBerSplitSerializer();
break;
case EXTRACT:
codec = new ByteArrayBerExtractSerializer();
break;
default:
throw new IllegalArgumentException("Invalid encoding: " + this.encoding);
}
codec.setApplicationEventPublisher(this.applicationEventPublisher);
if (this.maxMessageSize != null) {
codec.setMaxMessageSize(this.maxMessageSize);
}
return codec;
}
@Override
public Class<?> getObjectType() {
return AbstractByteArraySerializer.class;
}
}
BerEncoding 是一个简单的枚举,TcpBerSourceProperties 非常简单。
这是正确的方法吗?
如果是,我怎么能运行这个?我在上面提到的 link 到 运行 上找到的 tcp 流应用启动器上的任何地方都看不到 @SpringBootApplication 作为 Spring 启动独立应用程序?
您必须创建自己的 Spring 启动应用程序 class 并导入配置 class;参见 the documentation about creating custom apps。
我们从初学者生成一个标准应用程序(用于 rabbit 和 kafka 活页夹)作为 explained here。
我已经向
我找到了 this link 并尝试重用那里的代码并用我的代码更改编码。
我创建了以下 POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>tcp-ber-source</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>TCP Ber Source</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<tcp-app-starters-common.version>1.1.0.RELEASE</tcp-app-starters-common.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>tcp-app-starters-common</artifactId>
<version>${tcp-app-starters-common.version}</version>
</dependency>
<dependency>
<groupId>com.example</groupId>
<artifactId>ber-byte-array-serializers</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置:
@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpBerSourceProperties.class)
public class TcpBerSourceConfiguration {
private final Source channels;
private final TcpBerSourceProperties properties;
@Autowired
public TcpSourceConfiguration(final TcpBerSourceProperties properties, final Source channels) {
this.properties = properties;
this.channels = channels;
}
@Bean
public TcpReceivingChannelAdapter adapter(@Qualifier("tcpBerSourceConnectionFactory") final AbstractConnectionFactory connectionFactory) {
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannel(this.channels.output());
return adapter;
}
@Bean
public TcpConnectionFactoryFactoryBean tcpBerSourceConnectionFactory(@Qualifier("tcpBerSourceDecoder") final AbstractByteArraySerializer decoder) throws Exception {
final TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
factoryBean.setType("server");
factoryBean.setPort(this.properties.getPort());
factoryBean.setUsingNio(this.properties.isNio());
factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
factoryBean.setLookupHost(this.properties.isReverseLookup());
factoryBean.setDeserializer(decoder);
factoryBean.setSoTimeout(this.properties.getSocketTimeout());
return factoryBean;
}
@Bean
public BerEncoderDecoderFactoryBean tcpBerSourceDecoder() {
final BerEncoderDecoderFactoryBean factoryBean = new BerEncoderDecoderFactoryBean(this.properties.getDecoder());
factoryBean.setMaxMessageSize(this.properties.getBufferSize());
return factoryBean;
}
}
还有这个 FactoryBean:
public class BerEncoderDecoderFactoryBean extends AbstractFactoryBean<AbstractByteArraySerializer> implements ApplicationEventPublisherAware {
private final BerEncoding encoding;
private ApplicationEventPublisher applicationEventPublisher;
private Integer maxMessageSize;
public BerEncoderDecoderFactoryBean(final BerEncoding encoding) {
Assert.notNull(encoding, "'encoding' cannot be null");
this.encoding = encoding;
}
@Override
public void setApplicationEventPublisher(final ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
/**
* The maximum message size allowed when decoding.
* @param maxMessageSize the maximum message size.
*/
public void setMaxMessageSize(final int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
@Override
protected AbstractByteArraySerializer createInstance() throws Exception {
final AbstractByteArraySerializer codec;
switch (this.encoding) {
case SPLIT:
codec = new ByteArrayBerSplitSerializer();
break;
case EXTRACT:
codec = new ByteArrayBerExtractSerializer();
break;
default:
throw new IllegalArgumentException("Invalid encoding: " + this.encoding);
}
codec.setApplicationEventPublisher(this.applicationEventPublisher);
if (this.maxMessageSize != null) {
codec.setMaxMessageSize(this.maxMessageSize);
}
return codec;
}
@Override
public Class<?> getObjectType() {
return AbstractByteArraySerializer.class;
}
}
BerEncoding 是一个简单的枚举,TcpBerSourceProperties 非常简单。
这是正确的方法吗?
如果是,我怎么能运行这个?我在上面提到的 link 到 运行 上找到的 tcp 流应用启动器上的任何地方都看不到 @SpringBootApplication 作为 Spring 启动独立应用程序?
您必须创建自己的 Spring 启动应用程序 class 并导入配置 class;参见 the documentation about creating custom apps。
我们从初学者生成一个标准应用程序(用于 rabbit 和 kafka 活页夹)作为 explained here。