如何在 cygnus 中正确设置列模式?
How to properly set up column mode in cygnus?
我让 cygnus 在行模式下自动工作得很好。但是我想切换到列模式,这意味着我现在必须使用适当的列类型手动设置 tables(对吗?)。
我认为属性的列类型应该与我在上下文代理中指定的列类型相对应(即温度:浮点数;压力:整数;等等),对吗?
但是这些类型呢:
recvTime(我在猜日期时间?);
fiwareservicepath(字符串?);
entityId(整数?);
实体类型(字符串?);
temperature_md(浮点数与温度相同还是什么?);
pressure_md(它是与压力相同的整数还是什么?);
- 此外,如果没有以下列,我真的可以做到:
_md列;
固件服务路径
我可以删除那些吗?
- 最后,这个场景中的主键在哪里?能不能直接手动添加ID,设置为自增,不会和Cygnus冲突?
EDIT1: 我尝试输入我在第 3 步中推测的类型,并在日志中得到以下输出:
11 Dec 2015 15:22:12,783 INFO [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167) - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.attempt == 1
11 Dec 2015 15:22:12,784 INFO [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167) - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.success == 0
11 Dec 2015 15:22:12,785 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:138) - Starting new configuration:{ sourceRunners:{http-source=EventDrivenSourceRunner: { source:org.apache.flume.source.http.HTTPSource{name:http-source,state:START} }} sinkRunners:{mysql-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2e5babc counterGroup:{ name:null counters:{runner.interruptions=1, runner.backoffs.consecutive=1, runner.backoffs=1} } }} channels:{mysql-channel=org.apache.flume.channel.MemoryChannel{name: mysql-channel}} }
11 Dec 2015 15:22:12,787 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:145) - Starting Channel mysql-channel
11 Dec 2015 15:22:12,789 INFO [lifecycleSupervisor-1-4] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94) - Component type: CHANNEL, name: mysql-channel started
11 Dec 2015 15:22:12,792 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173) - Starting Sink mysql-sink
11 Dec 2015 15:22:12,793 INFO [lifecycleSupervisor-1-2] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.start:152) - [mysql-sink] Startup completed
11 Dec 2015 15:22:12,794 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184) - Starting Source http-source
11 Dec 2015 15:22:12,800 INFO [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:92) - Grouping rules read:
11 Dec 2015 15:22:12,802 ERROR [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.parseGroupingRules:165) - Error while parsing the Json-based grouping rules file. Details=null
11 Dec 2015 15:22:12,803 WARN [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:98) - Grouping rules syntax has errors
11 Dec 2015 15:22:12,804 INFO [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67) - jetty-6.1.26
11 Dec 2015 15:22:12,809 INFO [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67) - Started SocketConnector@0.0.0.0:5050
11 Dec 2015 15:22:12,810 INFO [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94) - Component type: SOURCE, name: http-source started
11 Dec 2015 15:22:46,806 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
- 所以我猜我应该对匹配规则做些什么。但我不知道是什么?我尝试使用 grouping_rules.conf 中提供的 link,但它 returns 一个 404.
EDIT2: 这是我正在使用的创建 table 脚本:
CREATE TABLE sensor_room1_room (
sensor_room1_roomID INT NOT NULL AUTO_INCREMENT,
recvTime varchar(40),
fiwareservicepath varchar(40),
entityId int (10),
entityType varchar (40),
pressure int (3),
pressure_md int(3),
temperature float (5),
temperature_md float(5),
PRIMARY KEY (sensor_room1_roomID));
EDIT3: 这里出了点问题,我无事可做。看看我的table结构:
mysql> describe sensor;
+-------------------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+------------+------+-----+---------+-------+
| recvTime | mediumtext | YES | | NULL | |
| fiwareservicepath | text | YES | | NULL | |
| entityId | text | YES | | NULL | |
| entityType | text | YES | | NULL | |
| pressure | text | YES | | NULL | |
| pressure_md | text | YES | | NULL | |
| temperature | text | YES | | NULL | |
| temperature_md | text | YES | | NULL | |
+-------------------+------------+------+-----+---------+-------+
8 rows in set (0.00 sec)
我正在传递这个 NGSI 命令:
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' \
--header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
"contextElements": [
{
"type": "Room",
"isPattern": "false",
"id": "Room1",
"attributes": [
{
"name": "temperature",
"type": "float",
"value": "321"
},
{
"name": "pressure",
"type": "integer",
"value": "123"
}
]
}
],
"updateAction": "APPEND"
}
EOF
我在日志中收到此错误:
14 Dec 2015 14:42:46,248 INFO [1161924167@qtp-1635328039-1] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:255) - Event put in the channel (id=110985483, ttl=10)
14 Dec 2015 14:43:09,258 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:09,266 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.persistAggregation:429) - [mysql-sink] Persisting data at OrionMySQLSink. Database (trace_data), Table (sensor), Fields ((recvTime,fiwareservicepath,entityId,entityType,temperature,temperature_md)), Values (('2015-12-14T13:42:46.206Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.170Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.220Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.223Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.225Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.228Z','sensor','Room1','Room','123','[]','321','[]'),('2015-12-14T13:42:46.248Z','sensor','Room1','Room','123','[]','321','[]'))
14 Dec 2015 14:43:09,300 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:318) - Bad context data (Column count doesn't match value count at row 6)
14 Dec 2015 14:43:09,300 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:323) - Finishing transaction (1450100555-186-0000000001,1450100555-186-0000000000,1450100555-186-0000000002,1450100555-186-0000000003,1450100555-186-0000000005,1450100555-186-0000000004,1450100555-186-0000000006)
14 Dec 2015 14:43:39,305 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:39,305 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:295) - Finishing transaction ()
7.这里有冲突(列数与第 6 行的值数不匹配)我无法解决。有什么建议吗?
8.我仍然需要在其中包含 tableID,但不知道如何?
这里要强调一下,我这里选择的数据库只有MySql.
我会逐条回答你的问题:
- 是的,在使用
OrionMySQLSink
的列模式下工作时,必须提前配置 table。您可以在 here. 上找到解释
- 这不是强制性的,因为 Orion Context Broker 中声明的类型没有任何意义;它们没有真正的语义,但它们对您(用户)具有语义。我的意思是,您可以说温度属性具有 "float" 类型、"centigrade degrees" 类型或 "potato" 类型。这就是为什么
OrionMySQLSink
的列模式不能自动创建 table 的原因。无论如何,MySQL 驱动程序会将属性值 "casted" 转换为您在 table 定义中声明的 MySQL 类型。你能编辑显示你的 table 创建命令的问题吗?
- 每个值的值:
- recvTime --> 通常是 DATETIME,但也可以是 TEXT 或 VARCHAR,具体取决于您希望如何使用该值。
- fiwareservicepath --> 通常是文本,但也可以是整数,具体取决于它是字符串还是数字。
- entityId --> 通常是文本,但也可以是整数,具体取决于它是字符串还是数字。
- entityType --> 通常是 TEXT,但也可以是 INTEGER,具体取决于它是字符串还是数字。
- temperature_md --> 通常是 TEXT,因为这可以是 Json 值,不过我认为 MySQL 的最新版本支持 JSON 类型。
- pressure_md --> 通常是 TEXT,因为这可以是 Json 值,不过我认为 MySQL 的最新版本支持 JSON 类型。
- 您无法删除它们,因为
OrionMySQLSink
会期望找到这些元数据字段。
OrionMySQLSink
不会自行添加任何主键。根据您的建议,您可以使用自动增量键添加这些内容。
- 不一定。事实上,尽管有这样的错误,分组规则文件可以为空(毕竟 Cygnus 会 运行)。
编辑
- 问题在于并非您在 Cygnus 收到的所有通知都具有相同的属性长度。因此,当批量聚合它们时,字段和值的数量存在问题。如果查看 official documentation,您会发现在列模式下工作时需要发送所有属性:
the column mode is only recommended if your subscription is designed for always sending the same attributes, event if they were not updated since the last notification.
另一种解决方案可能是移动到已配置的 batch_size
1;您的性能可能不太好,但是每个通知都将独立于其他通知进行处理,因此每个 MySQL 查询都将具有与查询值匹配的查询字段。
- 只需创建带有此自动增量字段的 table,每次 Cygnus 插入新行时,此字段都会自动更新。
我让 cygnus 在行模式下自动工作得很好。但是我想切换到列模式,这意味着我现在必须使用适当的列类型手动设置 tables(对吗?)。
我认为属性的列类型应该与我在上下文代理中指定的列类型相对应(即温度:浮点数;压力:整数;等等),对吗?
但是这些类型呢:
recvTime(我在猜日期时间?); fiwareservicepath(字符串?); entityId(整数?); 实体类型(字符串?); temperature_md(浮点数与温度相同还是什么?); pressure_md(它是与压力相同的整数还是什么?);
- 此外,如果没有以下列,我真的可以做到:
_md列; 固件服务路径
我可以删除那些吗?
- 最后,这个场景中的主键在哪里?能不能直接手动添加ID,设置为自增,不会和Cygnus冲突?
EDIT1: 我尝试输入我在第 3 步中推测的类型,并在日志中得到以下输出:
11 Dec 2015 15:22:12,783 INFO [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167) - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.attempt == 1
11 Dec 2015 15:22:12,784 INFO [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167) - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.success == 0
11 Dec 2015 15:22:12,785 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:138) - Starting new configuration:{ sourceRunners:{http-source=EventDrivenSourceRunner: { source:org.apache.flume.source.http.HTTPSource{name:http-source,state:START} }} sinkRunners:{mysql-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2e5babc counterGroup:{ name:null counters:{runner.interruptions=1, runner.backoffs.consecutive=1, runner.backoffs=1} } }} channels:{mysql-channel=org.apache.flume.channel.MemoryChannel{name: mysql-channel}} }
11 Dec 2015 15:22:12,787 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:145) - Starting Channel mysql-channel
11 Dec 2015 15:22:12,789 INFO [lifecycleSupervisor-1-4] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94) - Component type: CHANNEL, name: mysql-channel started
11 Dec 2015 15:22:12,792 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173) - Starting Sink mysql-sink
11 Dec 2015 15:22:12,793 INFO [lifecycleSupervisor-1-2] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.start:152) - [mysql-sink] Startup completed
11 Dec 2015 15:22:12,794 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184) - Starting Source http-source
11 Dec 2015 15:22:12,800 INFO [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:92) - Grouping rules read:
11 Dec 2015 15:22:12,802 ERROR [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.parseGroupingRules:165) - Error while parsing the Json-based grouping rules file. Details=null
11 Dec 2015 15:22:12,803 WARN [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:98) - Grouping rules syntax has errors
11 Dec 2015 15:22:12,804 INFO [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67) - jetty-6.1.26
11 Dec 2015 15:22:12,809 INFO [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67) - Started SocketConnector@0.0.0.0:5050
11 Dec 2015 15:22:12,810 INFO [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94) - Component type: SOURCE, name: http-source started
11 Dec 2015 15:22:46,806 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
- 所以我猜我应该对匹配规则做些什么。但我不知道是什么?我尝试使用 grouping_rules.conf 中提供的 link,但它 returns 一个 404.
EDIT2: 这是我正在使用的创建 table 脚本:
CREATE TABLE sensor_room1_room (
sensor_room1_roomID INT NOT NULL AUTO_INCREMENT,
recvTime varchar(40),
fiwareservicepath varchar(40),
entityId int (10),
entityType varchar (40),
pressure int (3),
pressure_md int(3),
temperature float (5),
temperature_md float(5),
PRIMARY KEY (sensor_room1_roomID));
EDIT3: 这里出了点问题,我无事可做。看看我的table结构:
mysql> describe sensor;
+-------------------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+------------+------+-----+---------+-------+
| recvTime | mediumtext | YES | | NULL | |
| fiwareservicepath | text | YES | | NULL | |
| entityId | text | YES | | NULL | |
| entityType | text | YES | | NULL | |
| pressure | text | YES | | NULL | |
| pressure_md | text | YES | | NULL | |
| temperature | text | YES | | NULL | |
| temperature_md | text | YES | | NULL | |
+-------------------+------------+------+-----+---------+-------+
8 rows in set (0.00 sec)
我正在传递这个 NGSI 命令:
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' \
--header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
"contextElements": [
{
"type": "Room",
"isPattern": "false",
"id": "Room1",
"attributes": [
{
"name": "temperature",
"type": "float",
"value": "321"
},
{
"name": "pressure",
"type": "integer",
"value": "123"
}
]
}
],
"updateAction": "APPEND"
}
EOF
我在日志中收到此错误:
14 Dec 2015 14:42:46,248 INFO [1161924167@qtp-1635328039-1] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:255) - Event put in the channel (id=110985483, ttl=10)
14 Dec 2015 14:43:09,258 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:09,266 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.persistAggregation:429) - [mysql-sink] Persisting data at OrionMySQLSink. Database (trace_data), Table (sensor), Fields ((recvTime,fiwareservicepath,entityId,entityType,temperature,temperature_md)), Values (('2015-12-14T13:42:46.206Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.170Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.220Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.223Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.225Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.228Z','sensor','Room1','Room','123','[]','321','[]'),('2015-12-14T13:42:46.248Z','sensor','Room1','Room','123','[]','321','[]'))
14 Dec 2015 14:43:09,300 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:318) - Bad context data (Column count doesn't match value count at row 6)
14 Dec 2015 14:43:09,300 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:323) - Finishing transaction (1450100555-186-0000000001,1450100555-186-0000000000,1450100555-186-0000000002,1450100555-186-0000000003,1450100555-186-0000000005,1450100555-186-0000000004,1450100555-186-0000000006)
14 Dec 2015 14:43:39,305 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:39,305 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:295) - Finishing transaction ()
7.这里有冲突(列数与第 6 行的值数不匹配)我无法解决。有什么建议吗?
8.我仍然需要在其中包含 tableID,但不知道如何?
这里要强调一下,我这里选择的数据库只有MySql.
我会逐条回答你的问题:
- 是的,在使用
OrionMySQLSink
的列模式下工作时,必须提前配置 table。您可以在 here. 上找到解释
- 这不是强制性的,因为 Orion Context Broker 中声明的类型没有任何意义;它们没有真正的语义,但它们对您(用户)具有语义。我的意思是,您可以说温度属性具有 "float" 类型、"centigrade degrees" 类型或 "potato" 类型。这就是为什么
OrionMySQLSink
的列模式不能自动创建 table 的原因。无论如何,MySQL 驱动程序会将属性值 "casted" 转换为您在 table 定义中声明的 MySQL 类型。你能编辑显示你的 table 创建命令的问题吗? - 每个值的值:
- recvTime --> 通常是 DATETIME,但也可以是 TEXT 或 VARCHAR,具体取决于您希望如何使用该值。
- fiwareservicepath --> 通常是文本,但也可以是整数,具体取决于它是字符串还是数字。
- entityId --> 通常是文本,但也可以是整数,具体取决于它是字符串还是数字。
- entityType --> 通常是 TEXT,但也可以是 INTEGER,具体取决于它是字符串还是数字。
- temperature_md --> 通常是 TEXT,因为这可以是 Json 值,不过我认为 MySQL 的最新版本支持 JSON 类型。
- pressure_md --> 通常是 TEXT,因为这可以是 Json 值,不过我认为 MySQL 的最新版本支持 JSON 类型。
- 您无法删除它们,因为
OrionMySQLSink
会期望找到这些元数据字段。 OrionMySQLSink
不会自行添加任何主键。根据您的建议,您可以使用自动增量键添加这些内容。- 不一定。事实上,尽管有这样的错误,分组规则文件可以为空(毕竟 Cygnus 会 运行)。
编辑
- 问题在于并非您在 Cygnus 收到的所有通知都具有相同的属性长度。因此,当批量聚合它们时,字段和值的数量存在问题。如果查看 official documentation,您会发现在列模式下工作时需要发送所有属性:
the column mode is only recommended if your subscription is designed for always sending the same attributes, event if they were not updated since the last notification.
另一种解决方案可能是移动到已配置的 batch_size
1;您的性能可能不太好,但是每个通知都将独立于其他通知进行处理,因此每个 MySQL 查询都将具有与查询值匹配的查询字段。
- 只需创建带有此自动增量字段的 table,每次 Cygnus 插入新行时,此字段都会自动更新。