Fiware - Cygnus:集合名称问题

Fiware - Cygnus: Issue with collection's names

我在 Cygnus 模块上遇到了一个奇怪的行为。我使用的是 Context Broker 0.28 版和 Cygnus 0.13 版。假设我已经在 CB(上下文代理)上加载了一些类似于这个的实体:

(curl 172.21.0.33:1026/v1/updateContext -s -S 
--header 'Fiware-Service: cb33_1003_06' 
--header 'Fiware-ServicePath: /Lugar' 
--header 'Content-Type: application/json' 
--header 'Accept: application/json' -d  @- | python -mjson.tool) <<EOF

    {
        "contextElements": [
            {
                "type": "Acceso_Wifi",
                "isPattern": "false",
                "id": "AP_1.3",
                "attributes": [
                    {
                        "name": "temperature",
                        "type": "float",
                        "value": "25"
                    },
                    {
                        "name": "pressure",
                        "type": "integer",
                        "value": "1"
                    },
                    {
                        "name": "position",
                        "type": "coords",
                        "value": "13.322326, -1.983824",
                        "metadatas": [
                            {
                                "name": "location",
                                "type": "string",
                                "value": "WGS84"
                            }
                        ]
                    }
                ]
            }
        ],
        "updateAction": "APPEND"
    }

    EOF

现在我订阅了 Cygnus:

(curl 172.21.0.33:1026/v1/subscribeContext -s -S 
--header 'Fiware-Service: cb33_1003_06' 
--header 'Fiware-ServicePath: /Lugar' 
--header 'Content-Type: application/json' 
--header 'Accept: application/json' -d  @- | python -mjson.tool) <<EOF
{
    "entities": [
        {
            "type": "Acceso_Wifi",
            "isPattern": "true",
            "id": "AP_.*"
        }
    ],
    "attributes": [
        "temperature",
        "pressure"
    ],
    "reference": "http://172.21.0.33:5050/notify",
    "duration": "P1M",
    "notifyConditions": [
        {
            "type": "ONCHANGE",
            "condValues": []
        }
    ],
    "throttling": "PT1S"
}
EOF

CB 将针对先前加载到 CB 数据库中的每个匹配实体向 Cygnus 发送一个通知。假设有 3 个实体满足订阅条件。在这种情况下,将为这些实体中的每一个在 Cygnus 上创建一个集合(我使用默认的 dm-by-entity 数据模型)。问题是集合的名称格式不正确。 Fiware-ServicePath 在每个实体、每个集合名称上连接一次:

cygnus2_/Lugar__Lugar__Lugar_AP_1.1_Acceso_Wifi
cygnus2_/Lugar__Lugar__Lugar_AP_1.2_Acceso_Wifi
cygnus2_/Lugar__Lugar__Lugar_AP_1.3_Acceso_Wifi

如果我们更新其中一个实体,将创建一个具有正确名称的新集合:

cygnus2_/Lugar_AP_1.1_Acceso_Wifi
cygnus2_/Lugar__Lugar__Lugar_AP_1.1_Acceso_Wifi
cygnus2_/Lugar__Lugar__Lugar_AP_1.2_Acceso_Wifi
cygnus2_/Lugar__Lugar__Lugar_AP_1.3_Acceso_Wifi

在这个例子中只有 3 个实体,但是随着匹配实体数量的增加,集合的名称会越来越长。并且当它到达50个字符时,将引发警告并且不会创建集合。

org.apache.flume.source.http.HTTPBadRequestException: 'fiware-servicePath' header length greater than 50)
        at com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents(OrionRestHandler.java:209)
        at org.apache.flume.source.http.HTTPSource$FlumeHTTPServlet.doPost(HTTPSource.java:184)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:725)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:814)
        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:401)
        at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
        at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.Server.handle(Server.java:326)
        at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
        at org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:945)
        at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:756)
        at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:218)
        at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
        at org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:228)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

¿这是由于配置不正确导致的行为,还是 Cygnus 的问题?

这是 OrionMongoSink 的配置:

# OrionMongoSink configuration
# sink class, must not be changed
cygnusagent.sinks.mongo-sink.type = com.telefonica.iot.cygnus.sinks.OrionMongoSink
# channel name from where to read notification events
cygnusagent.sinks.mongo-sink.channel = mongo-channel
# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.mongo-sink.enable_grouping = false
# true if lower case is wanted to forced in all the element names, false otherwise
cygnusagent.sinks.mongo-sink.enable_lowercase = false
# FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run
cygnusagent.sinks.mongo-sink.mongo_hosts = mongodb1.spider.net:27017,mongodb2.spider.net:27017,mongodb3.spider.net:27017
###cygnusagent.sinks.mongo-sink.mongo_hosts = 172.21.0.25:27017,172.21.0.26:27017,172.21.0.28:27017
# a valid user in the MongoDB server (or empty if authentication is not enabled in MongoDB)
cygnusagent.sinks.mongo-sink.mongo_username =
# password for the user above (or empty if authentication is not enabled in MongoDB)
cygnusagent.sinks.mongo-sink.mongo_password =
# prefix for the MongoDB databases
cygnusagent.sinks.mongo-sink.db_prefix = cygnus2_
# prefix pro the MongoDB collections
cygnusagent.sinks.mongo-sink.collection_prefix = cygnus2_
# true is collection names are based on a hash, false for human redable collections
cygnusagent.sinks.mongo-sink.should_hash = false
# Must be dm-by-service-path or dm-by-entity
cygnusagent.sinks.mongo-sink.data_model = dm-by-entity
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.mongo-sink.attr_persistence = column
# number of notifications to be included within a processing batch
cygnusagent.sinks.mongo-sink.batch_size = 1
# timeout for batch accumulation
cygnusagent.sinks.mongo-sink.batch_timeout = 30
# number of retries upon persistence error
cygnusagent.sinks.mongo-sink.batch_ttl = 10
# Collections will be removed if older than the value specified in seconds. Set to 0 if not wanting this policy.
cygnusagent.sinks.mongo-sink.data_expiration = 0
# The oldest data (according to insertion time) will be removed if the size of the data collection gets bigger than the value specified in bytes. Minimum value (different than 0) is 4096 bytes.
cygnusagent.sinks.mongo-sink.collection_size = 0
# The oldest data (according to insertion time) will be removed if the number of documents in the data collections goes beyond the specified value. Set to 0 if not wanting this policy
cygnusagent.sinks.mongo-sink.max_documents = 0

和另一个代理的配置:

# source configuration
# channel name where to write the notification events
#####cygnusagent.sources.http-source.channels = hdfs-channel mysql-channel postgresql-channel ckan-channel mongo-channel sth-channel kafka-channel
cygnusagent.sources.http-source.channels = mongo-channel
# source class, must not be changed
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
cygnusagent.sources.http-source.port = 5050
# Flume handler that will parse the notifications, must not be changed
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
# URL target
cygnusagent.sources.http-source.handler.notification_target = /notify
# Default service (service semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service = def_serv
# Default service path (service path semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service_path = def_serv_path
# Number of channel re-injection retries before a Flume event is definitely discarded (-1 means infinite retries)
cygnusagent.sources.http-source.handler.events_ttl = -1
# Source interceptors, do not change
cygnusagent.sources.http-source.interceptors = ts gi
# TimestampInterceptor, do not change
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
# GroupinInterceptor, do not change
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
# Grouping rules for the GroupingInterceptor, put the right absolute path to the file if necessary
# See the doc/design/interceptors document for more details
#####cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf

频道配置:

# mongo-channel configuration
# channel type (must not be changed)
cygnusagent.channels.mongo-channel.type = memory
# capacity of the channel
cygnusagent.channels.mongo-channel.capacity = 10000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mongo-channel.transactionCapacity = 100

在与 Orion 专家交谈后,这似乎是初始通知的问题,即创建订阅并发送第一个通知时;在这种情况下,将发送 multi-valued fiware-servicePath。然而,在那之后,下一个通知应该包含一个上下文元素,因此,一个 single-valued fiware-servicePath header.

这需要在代码中进行修复(或者忽略第一个通知,顺便说一下,这是无用的,或者通过支持 multi-valued 服务路径 headers)。同时,解决方法是停止使用 Cygnus 进行订阅,并在完成所有订阅后启动 Cygnus。

编辑 1:

我在 Cygnus Github 创建了这个问题:https://github.com/telefonicaid/fiware-cygnus/issues/923