如何使用 cygnus 将多个猎户座实体组合到一个 ckan 数据集?

how to combine multiple orion entities to one ckan dataset using cygnus?

现在我正在尝试为我的猎户座数据创建一个包含所有实体信息的数据集。 我设置正确,当某些数据更改时,orion 会向 cygnus 发送通知,然后 cygnus 会将数据添加到 ckan。 如图所示。

问题是 cygnus 为每个实体创建了一个 ckan 资源。 我该怎么做才能让 cygnus 将所有数据只放在一个资源中? 谢谢。

编辑 1:

我刚刚发现我可以通过在 Orion 中订阅一个来简化。

{
    "entities": [
        {
            "type": "Room",
            "isPattern": "true",
            "id": "Room.*"
        }
    ] ...

然后我注意到了 cygnus 中的分组功能 (http://fiware-cygnus.readthedocs.io/en/develop/installation_and_administration_guide/grouping_rules_conf/index.html),但现在我迷路了,因为 cygnus 不想加载我的分组配置文件。它显示此错误:

time=2016-05-03T05:32:29.658CDT | lvl=INFO | trans= | srv= | subsrv= | function=<init> | comp=Cygnus | msg=com.telefonica.iot.cygnus.interceptors.GroupingRules[58] : No grouping rules have been read
Exception in thread "Thread-1" java.lang.NullPointerException
    at java.io.File.<init>(File.java:277)
    at com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$ConfigurationReader.run(GroupingInterceptor.java:244)

在我的代理配置文件中我有:

cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf

并且在 grouping_rules.conf 中:

{
    "grouping_rules": [
        {
            "id": 1,
            "fields": [
                "entityId"
            ],
            "regex": "room.*",
            "destination": "Rooms",
            "fiware_service_path": "/myhouse"
        }
    ]
}

编辑 2:

我有文档中所说的所有行:

    cygnusagent.sources = http-source
    cygnusagent.sinks = ckan-sink
    cygnusagent.channels = ckan-channel

    cygnusagent.sources.http-source.channels = ckan-channel
    cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
    cygnusagent.sources.http-source.port = 5050
    cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
    cygnusagent.sources.http-source.handler.notification_target = /notify
    cygnusagent.sources.http-source.handler.default_service = test
    cygnusagent.sources.http-source.handler.default_service_path = /myhouse
    cygnusagent.sources.http-source.handler.events_ttl = 5
    cygnusagent.sources.http-source.interceptors = ts gi
    cygnusagent.sources.http-source.interceptors.ts.type = timestamp
    cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
    cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf
cygnusagent.channels.ckan-channel.type = memory
cygnusagent.channels.ckan-channel.capacity = 1000
cygnusagent.channels.ckan-channel.transactionCapacity = 100

# ============================================
# OrionCKANSink configuration
# channel name from where to read notification events
cygnusagent.sinks.ckan-sink.channel = ckan-channel

# sink class, must not be changed
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink

# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.ckan-sink.enable_grouping = true

# true if lower case is wanted to forced in all the element names, false otherwise
cygnusagent.sinks.hdfs-sink.enable_lowercase = false

# the CKAN API key to use
cygnusagent.sinks.ckan-sink.api_key = 436fffc8-b397-478a-92fd-bbc5ffaf8269

# the FQDN/IP address for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_host = ckan-demo.ckan.io

# the port for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_port = 80

# Orion URL used to compose the resource URL with the convenience operation URL to query it
cygnusagent.sinks.ckan-sink.orion_url = http://localhost:1026

# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.ckan-sink.attr_persistence = column

# enable SSL for secure Http transportation; 'true' or 'false'
cygnusagent.sinks.ckan-sink.ssl = false

# number of notifications to be included within a processing batch
cygnusagent.sinks.ckan-sink.batch_size = 100

# timeout for batch accumulation
cygnusagent.sinks.ckan-sink.batch_timeout = 60

# number of retries upon persistence error
cygnusagent.sinks.ckan-sink.batch_ttl = 10

我觉得文件权限没问题:

[root@VM013cen-Prod conf]# ls *.conf -l
-rwxrwxrwx 1 cygnus cygnus 2675 may  3 06:45 agent_test.conf
-rwxrwxrwx 1 cygnus cygnus  258 may  3 05:08 grouping_rules.conf
-rwxr-xr-x 1 cygnus cygnus  135 mar  1 02:50 krb5_login.conf

测试通过:

Results :

Tests run: 80, Failures: 0, Errors: 0, Skipped: 0

    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 1:05.862s
    [INFO] Finished at: Tue May 03 06:53:44 CDT 2016
    [INFO] Final Memory: 41M/105M
    [INFO] ------------------------------------------------------------------------

我使用的启动命令:/usr/cygnus/bin/cygnus-flume-ng agent --conf /usr/cygnus/conf/ -f /usr/cygnus/conf/agent_test.conf -n cygnusagent -Dflume.root.logger=INFO,console

是的,您是对的,分组规则旨在实现您所描述的用例。因此,通过使用匹配所有实体类型的正则表达式定义一个简单的规则(假设这样的类型总是相同的)应该可以解决问题;或者通过使用与实体 ID 的公共部分匹配的正则表达式(如您所建议的)。

不过,您的问题似乎与分组规则本身有关。你说你的配置包含:

cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf

你能在那行之前确认你有这样的东西吗?

cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder

另外,可以检查一下/usr/cygnus/conf/grouping_rules.conf的权限吗?也许它不能被用户阅读 运行 Cygnus.

关于规则,新的 FIWARE 服务路径应以 / 开头,因为所有 FIWARE 服务路径都必须以斜线开头(我认为这在上一版本中没有正确记录)。

HTH!

编辑 1:

我想我已经找到问题所在了。配置的参数名称是“...gropuing...”,而正确的名称是“...grouping... " :)