Spring XD:从文件源到批处理作业的管道 (>) 失败(IllegalArgumentException:无法将提供的 JSON 转换为 Map<String, Object>)

Spring XD: pipe (>) from file source to batch job fails (IllegalArgumentException: Unable to convert provided JSON to Map<String, Object>)

我正在尝试将 Spring XD 文件源的输出(文件内容)通过管道传输到一个简单的批处理作业。这失败并出现以下异常。在启动我的作业时,XD 似乎试图错误地将文件内容用作 JSON 作业参数。您可能会注意到,我用来测试的文件是 .gitignore.

02:01:11,568 1.1.0.RC1  INFO DeploymentsPathChildrenCache-0 module.ModuleDeployer - Deployed ResourceConfiguredModule [name=file, type=source, group=mystream1, index=0 @5c617835]
02:01:11,571 1.1.0.RC1 DEBUG task-scheduler-9 support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext'
02:01:11,573 1.1.0.RC1 DEBUG task-scheduler-9 support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'output'
02:01:11,575 1.1.0.RC1  INFO DeploymentSupervisor-0 server.StreamDeploymentListener - Deployment status for stream 'mystream1': DeploymentStatus{state=deployed}
02:01:11,577 1.1.0.RC1  INFO DeploymentSupervisor-0 server.StreamDeploymentListener - Stream Stream{name='mystream1'} deployment attempt complete
02:01:11,577 1.1.0.RC1 DEBUG task-scheduler-9 local.LocalMessageBus - outbound.job:testjob1 received message: GenericMessage [payload=C:\Users\me\Desktop\testfiles\.gitignore, headers={timestamp=1423465271577, id=de52e134-2dbc-454f-b27d-b032e89e6254, contentType=text/plain}]
02:01:11,578 1.1.0.RC1 DEBUG task-scheduler-1 local.LocalMessageBus - inbound.job:testjob1 received message: GenericMessage [payload=C:\Users\me\Desktop\testfiles\.gitignore, headers={timestamp=1423465271577, id=de52e134-2dbc-454f-b27d-b032e89e6254, contentType=text/plain}]
02:01:11,581 1.1.0.RC1 DEBUG task-scheduler-1 job.JobLaunchRequestTransformer - JobParameters are provided as 'String'. Convertering to Spring Batch JobParameters...
02:01:11,584 1.1.0.RC1 DEBUG task-scheduler-1 support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'errorChannel'
02:01:11,588 1.1.0.RC1 ERROR task-scheduler-1 handler.LoggingHandler - org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.IllegalArgumentException: Unable to convert provided JSON to Map<String, Object>
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:107)
    at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:87)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
    at com.sun.proxy.$Proxy123.handleMessage(Unknown Source)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
    at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
    at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
    at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
    at com.sun.proxy.$Proxy120.send(Unknown Source)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=13=]0(AbstractPollingEndpoint.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:149)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:146)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:298)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
Caused by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.IllegalArgumentException: Unable to convert provided JSON to Map<String, Object>
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78)
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68)
    ... 59 more
Caused by: java.lang.IllegalArgumentException: Unable to convert provided JSON to Map<String, Object>
    at org.springframework.xd.dirt.plugins.job.ExpandedJobParametersConverter.getJobParametersForJsonString(ExpandedJobParametersConverter.java:169)
    at org.springframework.xd.dirt.plugins.job.JobLaunchRequestTransformer.toJobLaunchRequest(JobLaunchRequestTransformer.java:128)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:112)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:129)
    at org.springframework.expression.spel.ast.MethodReference.access[=13=]0(MethodReference.java:49)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:342)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75)
    ... 61 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'C': was expecting ('true', 'false' or 'null')
 at [Source: C:\Users\me\Desktop\testfiles\.gitignore; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2300)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1459)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2175)
    at org.springframework.xd.dirt.plugins.job.ExpandedJobParametersConverter.getJobParametersForJsonString(ExpandedJobParametersConverter.java:166)
    ... 77 more

我的工作配置 XML 如下所示:

<context:component-scan base-package="com.me" use-default-filters="false">
        <context:include-filter type="annotation" expression="org.springframework.context.annotation.Configuration" />
    </context:component-scan>

    <int:channel id="input" />

    <batch:job id="filePollJob" restartable="true">
        <batch:step id="stepOne">
            <batch:tasklet>
                <batch:chunk reader="testReader" writer="testWriter" commit-interval="3"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

<bean id="testReader" class="com.me.TestItemReader" scope="step">
        <property name="resource" value="#{jobParameters['input.file']}"/>
        <property name="lineMapper" ref="testLineMapper" />
    </bean>

    <bean id="testLineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />
        </property>
        <property name="fieldSetMapper">
            <bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" />
        </property>
    </bean>

    <bean id="testWriter" class="com.me.TestItemWriter" />

我的 XD shell 命令:

xd:>job create myjob --definition "file-batch"
Successfully created job 'myjob'
xd:>job deploy myjob
Deployed job 'myjob'
xd:>stream create mystream --definition "file --dir='C:\Users\me\Desktop\testfiles\' --outputType=text/plain > queue:job:myjob" --deploy
Created and deployed new stream 'mystream'

感谢您提供的任何帮助!

默认情况下,文件源通过总线发送内容;您需要发送文件本身,而不是其内容 (ref=true)。

这个this sample举例。