处理器多次接收具有相同有效载荷的多条消息
A processor receive multiple message with same payload multiple time
我正在tar与 'spring-cloud-dataflow' 一起开发一个新项目,开发一堆 jar 来满足我的需要。
其中之一是一个处理器,用于解压来自文件源的tar 文件,此应用程序使用自定义版本的 integration-zip,具有处理 tar 和 gunzip 文件压缩的功能。
所以我的问题是以下问题:当我的源发送一条带有文件引用的消息时,处理器多次接收这些消息,有效负载相同但 ID 不同。
Here the log file of both component
如您所见,文件仅在消息上生成:
2017-10-02 12:38:28.013 INFO 17615 --- [ask-scheduler-3] o.s.i.file.FileReadingMessageSource : Created message: [GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={id=0b99b840-e3b3-f742-44ec-707aeea638c8, timestamp=1506940708013}]]
生产者收到 3 条消息:
2017-10-02 12:38:28.077 INFO 17591 --- [ -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2017-10-02 12:38:28.080 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=1
a4d4b9c-86fe-d3a8-d800-8013e8ae7027, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940708079}]' unpacking started...
2017-10-02 12:38:28.080 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:29.106 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=c
d611ca4-4cd9-0624-0871-dcf93a9a0051, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940709106}]' unpacking started...
2017-10-02 12:38:29.107 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:31.108 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]' unpacking started...
2017-10-02 12:38:31.108 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:31.116 ERROR 17591 --- [ -L-1] o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.io.FileNotFoundException: /tmp/patent/CNINO_im_201733_batch108.tgz (File o directory non esistente), failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}], failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]
at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
我找不到解决这个问题的方法,有没有人遇到同样的问题并找到了解决方法?或者有什么我想念的配置?
编辑:
我使用的是本地版本的 SDFS 1.2.2.RELEASE,所以 IO 文件操作在同一个文件系统上工作,我使用版本 Ditmars.BUILD-SNAPSHOT for SCS.
不幸的是,如果我禁用文件删除操作应用程序,这个应用程序仍然会多次处理消息。这是一些代码片段,我喜欢这是我的项目 repo:
这是我的处理器class:
@EnableBinding(Processor.class)
@EnableConfigurationProperties(UnTarProperties.class)
public class UnTarProcessor {
@Autowired
private UnTarProperties properties;
@Autowired
private Processor processor;
@Bean
public UncompressedResultSplitter splitter() {
return new UncompressedResultSplitter();
}
@Bean
public UnTarGzTransformer transformer() {
UnTarGzTransformer unTarGzTransformer = new UnTarGzTransformer(properties.isUseGzCompression());
unTarGzTransformer.setExpectSingleResult(properties.isSingleResult());
unTarGzTransformer.setWorkDirectory(new File(properties.getWorkDirectory()));
unTarGzTransformer.setDeleteFiles(properties.isDeleteFile());
return unTarGzTransformer;
}
@Bean
public IntegrationFlow process() {
return IntegrationFlows.from(processor.input())
.transform(transformer())
.split(splitter())
.channel(processor.output())
.get();
}
}
这是解压文件的核心方法:
@Override
protected Object doCompressTransform(final Message<?> message) throws Exception {
logger.info(String.format("Message '%s' unpacking started...", message));
try (InputStream checkMessage = checkMessage(message);
InputStream inputStream = (gzCompression ? new BufferedInputStream(new GZIPInputStream(checkMessage)) : new BufferedInputStream(checkMessage))) {
final Object payload = message.getPayload();
final Object unzippedData;
try (TarArchiveInputStream tarIn = new TarArchiveInputStream(inputStream)){
TarArchiveEntry entry = null;
final SortedMap<String, Object> uncompressedData = new TreeMap<String, Object>();
while ((entry = (TarArchiveEntry) tarIn.getNextEntry()) != null) {
final String zipEntryName = entry.getName();
final Date zipEntryTime = entry.getLastModifiedDate();
final long zipEntryCompressedSize = entry.getSize();
final String type = entry.isDirectory() ? "directory" : "file";
final File tempDir = new File(workDirectory, message.getHeaders().getId().toString());
tempDir.mkdirs(); // NOSONAR false positive
final File destinationFile = new File(tempDir, zipEntryName);
if (entry.isDirectory()) {
destinationFile.mkdirs(); // NOSONAR false positive
}
else {
unpackEntries(tarIn, entry, tempDir);
uncompressedData.put(zipEntryName, destinationFile);
}
}
if (uncompressedData.isEmpty()) {
unzippedData = null;
}
else {
if (this.expectSingleResult) {
if (uncompressedData.size() == 1) {
unzippedData = uncompressedData.values().iterator().next();
}
else {
throw new MessagingException(message, String.format("The UnZip operation extracted %s "
+ "result objects but expectSingleResult was 'true'.", uncompressedData.size()));
}
}
else {
unzippedData = uncompressedData;
}
}
logger.info("Payload unpacking completed...");
}
finally {
if (payload instanceof File && this.deleteFiles) {
final File filePayload = (File) payload;
if (!filePayload.delete() && logger.isWarnEnabled()) {
if (logger.isWarnEnabled()) {
logger.warn("failed to delete File '" + filePayload + "'");
}
}
}
}
return unzippedData;
}
catch (Exception e) {
throw new MessageHandlingException(message, "Failed to apply Zip transformation.", e);
}
}
方法 checkmessage() 抛出异常
protected InputStream checkMessage(Message<?> message) throws FileNotFoundException {
logger.info("Check message's payload type to decompress");
InputStream inputStream;
Object payload = message.getPayload();
if (payload instanceof File) {
final File filePayload = (File) payload;
if (filePayload.isDirectory()) {
throw new UnsupportedOperationException(String.format("Cannot unzip a directory: '%s'",
filePayload.getAbsolutePath()));
}
inputStream = new FileInputStream(filePayload);
}
else if (payload instanceof InputStream) {
inputStream = (InputStream) payload;
}
else if (payload instanceof byte[]) {
inputStream = new ByteArrayInputStream((byte[]) payload);
}
else {
throw new IllegalArgumentException(String.format("Unsupported payload type '%s'. " +
"The only supported payload types are java.io.File, byte[] and java.io.InputStream",
payload.getClass().getSimpleName()));
}
return inputStream;
}
非常感谢任何帮助。
非常感谢
我们需要更多信息。 SCDF 和 SCS 应用程序的版本。你的 DSL 至少是关于你如何部署你的应用程序的。
刚刚检查了您的日志,您是否意识到您的消费者由于 FileNotFoundException
而无法使用消息?您没有多次收到相同的消息,SCS 只是试图在失败之前重新发送它。检查您的完整日志以及如何无法打开指定位置的文件
异常发生在你的变压器上,由于重试配置 SCS,你多次收到消息,因为错误在你的逻辑中,很难理解。它说 FileNotFoundException
我不知道在您的进程中是什么将文件放在那里,这可能是原因。它似乎与 SCS
无关
我正在tar与 'spring-cloud-dataflow' 一起开发一个新项目,开发一堆 jar 来满足我的需要。
其中之一是一个处理器,用于解压来自文件源的tar 文件,此应用程序使用自定义版本的 integration-zip,具有处理 tar 和 gunzip 文件压缩的功能。
所以我的问题是以下问题:当我的源发送一条带有文件引用的消息时,处理器多次接收这些消息,有效负载相同但 ID 不同。
Here the log file of both component
如您所见,文件仅在消息上生成:
2017-10-02 12:38:28.013 INFO 17615 --- [ask-scheduler-3] o.s.i.file.FileReadingMessageSource : Created message: [GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={id=0b99b840-e3b3-f742-44ec-707aeea638c8, timestamp=1506940708013}]]
生产者收到 3 条消息:
2017-10-02 12:38:28.077 INFO 17591 --- [ -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2017-10-02 12:38:28.080 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=1
a4d4b9c-86fe-d3a8-d800-8013e8ae7027, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940708079}]' unpacking started...
2017-10-02 12:38:28.080 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:29.106 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=c
d611ca4-4cd9-0624-0871-dcf93a9a0051, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940709106}]' unpacking started...
2017-10-02 12:38:29.107 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:31.108 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]' unpacking started...
2017-10-02 12:38:31.108 INFO 17591 --- [ -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:31.116 ERROR 17591 --- [ -L-1] o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.io.FileNotFoundException: /tmp/patent/CNINO_im_201733_batch108.tgz (File o directory non esistente), failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}], failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]
at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
我找不到解决这个问题的方法,有没有人遇到同样的问题并找到了解决方法?或者有什么我想念的配置?
编辑:
我使用的是本地版本的 SDFS 1.2.2.RELEASE,所以 IO 文件操作在同一个文件系统上工作,我使用版本 Ditmars.BUILD-SNAPSHOT for SCS.
不幸的是,如果我禁用文件删除操作应用程序,这个应用程序仍然会多次处理消息。这是一些代码片段,我喜欢这是我的项目 repo:
这是我的处理器class:
@EnableBinding(Processor.class)
@EnableConfigurationProperties(UnTarProperties.class)
public class UnTarProcessor {
@Autowired
private UnTarProperties properties;
@Autowired
private Processor processor;
@Bean
public UncompressedResultSplitter splitter() {
return new UncompressedResultSplitter();
}
@Bean
public UnTarGzTransformer transformer() {
UnTarGzTransformer unTarGzTransformer = new UnTarGzTransformer(properties.isUseGzCompression());
unTarGzTransformer.setExpectSingleResult(properties.isSingleResult());
unTarGzTransformer.setWorkDirectory(new File(properties.getWorkDirectory()));
unTarGzTransformer.setDeleteFiles(properties.isDeleteFile());
return unTarGzTransformer;
}
@Bean
public IntegrationFlow process() {
return IntegrationFlows.from(processor.input())
.transform(transformer())
.split(splitter())
.channel(processor.output())
.get();
}
}
这是解压文件的核心方法:
@Override
protected Object doCompressTransform(final Message<?> message) throws Exception {
logger.info(String.format("Message '%s' unpacking started...", message));
try (InputStream checkMessage = checkMessage(message);
InputStream inputStream = (gzCompression ? new BufferedInputStream(new GZIPInputStream(checkMessage)) : new BufferedInputStream(checkMessage))) {
final Object payload = message.getPayload();
final Object unzippedData;
try (TarArchiveInputStream tarIn = new TarArchiveInputStream(inputStream)){
TarArchiveEntry entry = null;
final SortedMap<String, Object> uncompressedData = new TreeMap<String, Object>();
while ((entry = (TarArchiveEntry) tarIn.getNextEntry()) != null) {
final String zipEntryName = entry.getName();
final Date zipEntryTime = entry.getLastModifiedDate();
final long zipEntryCompressedSize = entry.getSize();
final String type = entry.isDirectory() ? "directory" : "file";
final File tempDir = new File(workDirectory, message.getHeaders().getId().toString());
tempDir.mkdirs(); // NOSONAR false positive
final File destinationFile = new File(tempDir, zipEntryName);
if (entry.isDirectory()) {
destinationFile.mkdirs(); // NOSONAR false positive
}
else {
unpackEntries(tarIn, entry, tempDir);
uncompressedData.put(zipEntryName, destinationFile);
}
}
if (uncompressedData.isEmpty()) {
unzippedData = null;
}
else {
if (this.expectSingleResult) {
if (uncompressedData.size() == 1) {
unzippedData = uncompressedData.values().iterator().next();
}
else {
throw new MessagingException(message, String.format("The UnZip operation extracted %s "
+ "result objects but expectSingleResult was 'true'.", uncompressedData.size()));
}
}
else {
unzippedData = uncompressedData;
}
}
logger.info("Payload unpacking completed...");
}
finally {
if (payload instanceof File && this.deleteFiles) {
final File filePayload = (File) payload;
if (!filePayload.delete() && logger.isWarnEnabled()) {
if (logger.isWarnEnabled()) {
logger.warn("failed to delete File '" + filePayload + "'");
}
}
}
}
return unzippedData;
}
catch (Exception e) {
throw new MessageHandlingException(message, "Failed to apply Zip transformation.", e);
}
}
方法 checkmessage() 抛出异常
protected InputStream checkMessage(Message<?> message) throws FileNotFoundException {
logger.info("Check message's payload type to decompress");
InputStream inputStream;
Object payload = message.getPayload();
if (payload instanceof File) {
final File filePayload = (File) payload;
if (filePayload.isDirectory()) {
throw new UnsupportedOperationException(String.format("Cannot unzip a directory: '%s'",
filePayload.getAbsolutePath()));
}
inputStream = new FileInputStream(filePayload);
}
else if (payload instanceof InputStream) {
inputStream = (InputStream) payload;
}
else if (payload instanceof byte[]) {
inputStream = new ByteArrayInputStream((byte[]) payload);
}
else {
throw new IllegalArgumentException(String.format("Unsupported payload type '%s'. " +
"The only supported payload types are java.io.File, byte[] and java.io.InputStream",
payload.getClass().getSimpleName()));
}
return inputStream;
}
非常感谢任何帮助。 非常感谢
我们需要更多信息。 SCDF 和 SCS 应用程序的版本。你的 DSL 至少是关于你如何部署你的应用程序的。
刚刚检查了您的日志,您是否意识到您的消费者由于 FileNotFoundException
而无法使用消息?您没有多次收到相同的消息,SCS 只是试图在失败之前重新发送它。检查您的完整日志以及如何无法打开指定位置的文件
异常发生在你的变压器上,由于重试配置 SCS,你多次收到消息,因为错误在你的逻辑中,很难理解。它说 FileNotFoundException
我不知道在您的进程中是什么将文件放在那里,这可能是原因。它似乎与 SCS