Spring-xd Redis sink with Collection type MAP

Spring-xd Redis sink with Collection type MAP

我正在尝试实现特定于集合类型的 Redis 接收器 = MAP

stream create tTEST_GF_SINK --definition "trigger --initialDelay=0 --fixedDelay=1 --timeUnit=MINUTES --payload= 'new Date().toString()' --outputType=application/json | 
  transform --expression='new java.util.Date().toString()'| 
  redis --collectionType=MAP --key=1" --deploy

对于所有其他集合类型,如 LIST 、 SET 、 ZSET 流能够写入这些 redis 接收器,但是当我使用 MAP 作为集合类型时,它会在 redis_mapkey.[=13 上抛出以下错误=]

2017-06-30T16:47:59-0400 1.3.1.RELEASE ERROR task-scheduler-3 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: Failed to store Message data in Redis collection; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8): Property or field 'redis_mapKey' cannot be found on object of type 'org.springframework.messaging.MessageHeaders' - maybe not public? at org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.handleMessageInternal(RedisStoreWritingMessageHandler.java:278) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:161) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=12=]0(AbstractPollingEndpoint.java:57) at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:176) at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:173) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:330) at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:55) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8): Property or field 'redis_mapKey' cannot be found on object of type 'org.springframework.messaging.MessageHeaders' - maybe not public? at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:224) at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94) at org.springframework.expression.spel.ast.PropertyOrFieldReference.access[=12=]0(PropertyOrFieldReference.java:46) at org.springframework.expression.spel.ast.PropertyOrFieldReference$AccessorLValue.getValue(PropertyOrFieldReference.java:374) at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:120) at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:267) at org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.determineMapKey(RedisStoreWritingMessageHandler.java:415) at org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.writeToMap(RedisStoreWritingMessageHandler.java:379) at org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.handleMessageInternal(RedisStoreWritingMessageHandler.java:271) ... 99 more

谁能帮帮我

使用 MAP collection,涉及两个键;存储地图的键以及元素存储在地图中的键。

XD 模块不会将 mapKey 公开为 属性。您可以在上游添加 header-enricher 以设置 redis_mapKey header,或者您可以编辑模块的配置以添加 map-key-expression.

例如,像这样编辑 xd/modules/sink/redis/config/redis.xml...

<beans:beans profile="use-store-expression">
    <redis:store-outbound-channel-adapter
        map-key-expression="payload.substring(17)"
        key-expression="${keyExpression}" collection-type="${collectionType}"
        channel="input" connection-factory="redisConnectionFactory" />
</beans:beans>

与...

xd:>stream create foo --definition "time | redis --collectionType=MAP --key='foo'" --deploy

结果...

127.0.0.1:6379> hgetall "foo"
 1) "52"
 2) "2017-07-01 08:43:52"
 3) "53"
 4) "2017-07-01 08:43:53"
 5) "54"
 6) "2017-07-01 08:43:54"
 7) "55"
 8) "2017-07-01 08:43:55"
...

@加里。我尝试了您对 header-enricher 添加 redis_mapkey 的建议。它工作正常。

stream create tTEST_GF_SINK --definition "trigger --initialDelay=0 --fixedDelay=1 --timeUnit=MINUTES --payload= 'new Date().toString()' --outputType=application/json |header-enricher --headers={\"redis_mapKey\":\"payload.substring(17)\"}| transform --expression='new java.util.Date().toString()'| redis --collectionType=MAP --key=1" --deploy

127.0.0.1:6379> hgetall 1

1) "ng()"

2) "Sat Jul 01 23:29:16 EDT 2017"