Cygnus Cosmos 集成

Cygnus Cosmos integration

我们设法整合了 idas-cygnus-orion 流程,一切都运行良好。 (感谢弗朗西斯科的帮助)

现在我们想将 Cosmos 添加到集成中,我已经根据您发布的答案配置了 Cygnus here(为 Cosmos 配置添加了以 * 开头的部分)

当我启动 Cygnus 时,我们收到此错误消息

        [root@idas-new centos]# tail -500f nohup.out
Warning: No configuration directory set! Use --conf <dir> to override.
Warning: JAVA_HOME is not set!
+ exec /usr/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/usr/cygnus/lib/*:/usr/cygnus/plugins.d/cygnus/lib/*:/usr/cygnus/agent_test.conf -n cygnusagent
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/cygnus/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/cygnus/plugins.d/cygnus/lib/cygnus-0.12.0_SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
16/05/18 09:26:46 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/05/18 09:26:46 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/usr/cygnus/conf/agent_test.con
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: cygnusagent
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Processing:hdfs-sink
16/05/18 09:26:46 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [cygnusagent]
16/05/18 09:26:46 INFO node.AbstractConfigurationProvider: Creating channels
16/05/18 09:26:46 INFO channel.DefaultChannelFactory: Creating instance of channel hdfs-channel type memory
16/05/18 09:26:46 INFO node.AbstractConfigurationProvider: Created channel hdfs-channel
16/05/18 09:26:46 INFO source.DefaultSourceFactory: Creating instance of source http-source, type org.apache.flume.source.http.HTTPS
16/05/18 09:26:46 INFO handlers.OrionRestHandler: Cygnus version (0.12.0_SNAPSHOT.d6ee0e4548ca6bf9116c1dee4c4cbcc7da8b1752)
16/05/18 09:26:46 INFO handlers.OrionRestHandler: Startup completed
16/05/18 09:26:46 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink, type: com.telefonica.iot.cygnus.sinks.OrionHDF
16/05/18 09:26:46 ERROR node.AbstractConfigurationProvider: Sink hdfs-sink has been removed due to an error during configuration
java.lang.NullPointerException
        at com.telefonica.iot.cygnus.sinks.OrionHDFSSink.configure(OrionHDFSSink.java:339)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurati
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
16/05/18 09:26:46 INFO node.AbstractConfigurationProvider: Channel hdfs-channel connected to [http-source]
16/05/18 09:26:46 INFO node.Application: Starting new configuration:{ sourceRunners:{http-source=EventDrivenSourceRunner: { source:oyChannel{name: hdfs-channel}} }
16/05/18 09:26:46 INFO node.Application: Starting Channel hdfs-channel
16/05/18 09:26:46 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: hdfs-channel, regist
16/05/18 09:26:46 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: hdfs-channel started
16/05/18 09:26:46 INFO node.Application: Starting Source http-source
16/05/18 09:26:46 INFO interceptors.GroupingRules: No grouping rules have been read
Exception in thread "Thread-1" java.lang.NullPointerException
        at java.io.File.<init>(Unknown Source)
        at com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$ConfigurationReader.run(GroupingInterceptor.java:244)
16/05/18 09:26:46 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
16/05/18 09:26:46 INFO mortbay.log: jetty-6.1.26
16/05/18 09:26:47 INFO mortbay.log: Started SocketConnector@0.0.0.0:5050

我们的路径中有这个 grouping_rules.conf 文件,但在里面,所有内容都被注释了(因为我们不需要这个文件来进行 CKAN 集成)我们需要为 Cosmos 编辑这个文件吗?(如果是,怎么样?)

你能帮我们解决这个问题吗?

非常感谢 奥马尔

cygnusagent.sources = http-source
cygnusagent.sinks = ckan-sink hdfs-sink
cygnusagent.channels = ckan-channel hdfs-channel

cygnusagent.sources.http-source.channels = ckan-channel hdfs-channel
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnusagent.sources.http-source.port = 5050
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
cygnusagent.sources.http-source.handler.notification_target = /notify
cygnusagent.sources.http-source.handler.events_ttl = 2
cygnusagent.sources.http-source.interceptors = ts gi
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /etc/cygnus/conf/grouping_rules.conf

cygnusagent.channels.ckan-channel.type = memory
cygnusagent.channels.ckan-channel.capacity = 1000
cygnusagent.channels.ckan-channel.transactionCapacity = 100

cygnusagent.sinks.ckan-sink.channel = ckan-channel
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
cygnusagent.sinks.ckan-sink.enable_grouping = false
cygnusagent.sinks.ckan-sink.api_key = XXXXXXXXXX
cygnusagent.sinks.ckan-sink.ckan_host = data.lab.fiware.org
cygnusagent.sinks.ckan-sink.ckan_port = 443
cygnusagent.sinks.ckan-sink.orion_url = localhost:1026
cygnusagent.sinks.ckan-sink.attr_persistence = row
cygnusagent.sinks.ckan-sink.ssl = true
cygnusagent.sinks.ckan-sink.batch_size = 1
cygnusagent.sinks.ckan-sink.batch_timeout = 10

cygnusagent.sinks.hdfs-sink.type = com.telefonica.iot.cygnus.sinks.OrionHDFSSink
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel
cygnusagent.sinks.hdfs-sink.enable_grouping = false
cygnusagent.sinks.hdfs-sink.hdfs_host = cosmos.lab.fiware.org
cygnusagent.sinks.hdfs-sink.hdfs_port = 14000
cygnusagent.sinks.hdfs-sink.hdfs_username = XXXXX
cygnusagent.sinks.hdfs-sink.hdfs_password = XXX
cygnusagent.sinks.hdfs-sink.oauth2_token = XXXXXXXXX
cygnusagent.sinks.hdfs-sink.file_format = json-column


cygnusagent.channels.hdfs-channel.type = memory
cygnusagent.channels.hdfs-channel.capacity = 1000
cygnusagent.channels.hdfs-channel.transactionCapacity = 100

配置似乎一切正常,除了:

cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /etc/cygnus/conf/grouping_rules.
conf
  • gropuing -> grouping(这可能来自快速入门指南,在我们修复之前这个错字一直存在)。
  • conf 部分文件名好像在第二行... Flume 配置不喜欢这样,最好所有关于参数都在同一行。

试试这个改变,让我们看看。