WSO2 流处理器中的多个源

Multiple Sources in WSO2 Stream Processor

我正在尝试使用两个源(都是文件)在 WSO2 流处理器上配置一个 Siddhi 应用程序,但这不起作用(一个源工作正常)。但是,即使我将应用程序拆分为也不起作用的 Siddhi 应用程序。两种情况的日志如下:

[2018-01-25 08:51:20,583]  INFO {org.quartz.impl.StdSchedulerFactory} - Using default implementation for ThreadExecutor
[2018-01-25 08:51:20,586]  INFO {org.quartz.simpl.SimpleThreadPool} - Job execution threads will use class loader of thread: Timer-0
[2018-01-25 08:51:20,599]  INFO {org.quartz.core.SchedulerSignalerImpl} - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
[2018-01-25 08:51:20,599]  INFO {org.quartz.core.QuartzScheduler} - Quartz Scheduler v.2.3.0 created.
[2018-01-25 08:51:20,600]  INFO {org.quartz.simpl.RAMJobStore} - RAMJobStore initialized.
[2018-01-25 08:51:20,601]  INFO {org.quartz.core.QuartzScheduler} - Scheduler meta-data: Quartz Scheduler (v2.3.0) 'polling-task-runner' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 1 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

[2018-01-25 08:51:20,601]  INFO {org.quartz.impl.StdSchedulerFactory} - Quartz scheduler 'polling-task-runner' initialized from an externally provided properties instance.
[2018-01-25 08:51:20,601]  INFO {org.quartz.impl.StdSchedulerFactory} - Quartz scheduler version: 2.3.0
[2018-01-25 08:51:20,601]  INFO {org.quartz.core.QuartzScheduler} - Scheduler polling-task-runner_$_NON_CLUSTERED started.
[2018-01-25 08:51:20,604]  INFO {org.quartz.core.QuartzScheduler} - Scheduler polling-task-runner_$_NON_CLUSTERED started.
[2018-01-25 08:51:20,605] ERROR {org.wso2.carbon.connector.framework.server.polling.PollingTaskRunner} - Exception occurred while scheduling job org.quartz.ObjectAlreadyExistsException: Unable to store Trigger with name: 'scheduledPoll' and group: 'group1', because one already exists with this identification.
    at org.quartz.simpl.RAMJobStore.storeTrigger(RAMJobStore.java:415)
    at org.quartz.simpl.RAMJobStore.storeJobAndTrigger(RAMJobStore.java:252)
    at org.quartz.core.QuartzScheduler.scheduleJob(QuartzScheduler.java:855)
    at org.quartz.impl.StdScheduler.scheduleJob(StdScheduler.java:249)
    at org.wso2.carbon.connector.framework.server.polling.PollingTaskRunner.start(PollingTaskRunner.java:74)
    at org.wso2.carbon.connector.framework.server.polling.PollingServerConnector.start(PollingServerConnector.java:57)
    at org.wso2.carbon.transport.remotefilesystem.server.connector.contractimpl.RemoteFileSystemServerConnectorImpl.start(RemoteFileSystemServerConnectorImpl.java:75)
    at org.wso2.extension.siddhi.io.file.FileSource.deployServers(FileSource.java:537)
    at org.wso2.extension.siddhi.io.file.FileSource.connect(FileSource.java:370)
    at org.wso2.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:130)
    at org.wso2.siddhi.core.SiddhiAppRuntime.start(SiddhiAppRuntime.java:335)
    at org.wso2.carbon.stream.processor.core.internal.StreamProcessorService.deploySiddhiApp(StreamProcessorService.java:280)
    at org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer.deploySiddhiQLFile(StreamProcessorDeployer.java:81)
    at org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer.deploy(StreamProcessorDeployer.java:170)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.lambda$deployArtifacts[=10=](DeploymentEngine.java:291)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.deployArtifacts(DeploymentEngine.java:282)
    at org.wso2.carbon.deployment.engine.internal.RepositoryScanner.sweep(RepositoryScanner.java:112)
    at org.wso2.carbon.deployment.engine.internal.RepositoryScanner.scan(RepositoryScanner.java:68)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.start(DeploymentEngine.java:121)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngineListenerComponent.onAllRequiredCapabilitiesAvailable(DeploymentEngineListenerComponent.java:216)
    at org.wso2.carbon.kernel.internal.startupresolver.StartupComponentManager.lambda$notifySatisfiableComponents(StartupComponentManager.java:266)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at org.wso2.carbon.kernel.internal.startupresolver.StartupComponentManager.notifySatisfiableComponents(StartupComponentManager.java:252)
    at org.wso2.carbon.kernel.internal.startupresolver.StartupOrderResolver.run(StartupOrderResolver.java:204)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)

任何人都可以提出如何克服这个问题的想法?

谢谢。

感谢您指出这个问题。 似乎这是由于安排了两个具有相同 ID 的轮询任务而发生的。 我在 git 存储库 [1] 中为此创建了一个问题。该修复程序将很快随更新一起提供。

[1] https://github.com/wso2/product-sp/issues/463

此致!