如何通过宁静向德鲁伊插入数据
How to insert data into druid via tranquility
按照 http://druid.io/docs/latest/tutorials/tutorial-loading-streaming-data.html 上的教程,我能够通过 Kafka 控制台向德鲁伊插入数据
卡夫卡控制台
规范文件如下所示
examples/indexing/wikipedia.spec
[
{
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
}
}
]
我通过
启动实时
java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime
在 Kafka 控制台中,我粘贴并输入以下内容
{"timestamp": "2013-08-10T01:02:33Z", "page": "Good Bye", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
然后我倾向于通过创建 select.json
和 运行 curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json
来执行查询
select.json
{
"queryType": "select",
"dataSource": "wikipedia",
"dimensions":[],
"metrics":[],
"granularity": "all",
"intervals": [
"2000-01-01/2020-01-02"
],
"filter" : {"type":"and",
"fields" : [
{ "type": "selector", "dimension": "user", "value": "catty" }
]
},
"pagingSpec":{"pagingIdentifiers": {}, "threshold":500}
}
我得到了以下结果。
[ {
"timestamp" : "2013-08-10T01:02:33.000Z",
"result" : {
"pagingIdentifiers" : {
"wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z" : 0
},
"events" : [ {
"segmentId" : "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z",
"offset" : 0,
"event" : {
"timestamp" : "2013-08-10T01:02:33.000Z",
"continent" : "North America",
"robot" : "false",
"country" : "United States",
"city" : "San Francisco",
"newPage" : "true",
"unpatrolled" : "true",
"namespace" : "article",
"anonymous" : "false",
"language" : "en",
"page" : "Good Bye",
"region" : "Bay Area",
"user" : "catty",
"deleted" : 200.0,
"added" : 57.0,
"count" : 1,
"delta" : -143.0
}
} ]
}
} ]
看来我已经正确设置了 Druid。
现在,我想通过 HTTP 端点插入数据。根据 How realtime data input to Druid?,推荐的方法似乎是使用 tranquility
宁静
我已经通过
启动了索引服务
java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/overlord:lib/*: io.druid.cli.Main server overlord
conf/server.json 长得像
{
"dataSources" : [
{
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"tuningConfig" : {
"windowPeriod" : "PT10M",
"type" : "realtime",
"intermediatePersistPeriod" : "PT10M",
"maxRowsInMemory" : "100000"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1"
}
}
],
"properties" : {
"zookeeper.connect" : "localhost",
"http.port" : "8200",
"http.threads" : "8"
}
}
然后,我使用
启动服务器
bin/tranquility server -configFile conf/server.json
我执行 post 到 http://xx.xxx.xxx.xxx:8200/v1/post/wikipedia,content-type
等于 application/json
{"timestamp": "2013-08-10T01:02:33Z", "page": "Selamat Pagi", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
我得到以下响应
{"result":{"received":1,"sent":0}}
看来宁静收到了我们的数据,但是发送给druid失败了!
我尝试 运行 curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json
,但没有得到我通过 tranquility 插入的输出。
知道为什么吗?谢谢。
设置德鲁伊以正确处理实时数据插入极其困难。
我发现最好的选择是使用 https://github.com/implydata 。 Imply 是一组围绕德鲁伊的包装器,使其易于使用。
但是imply的实时插入也不是很完美。在通过实时插入 3000 万个项目后,我进行了实验 OutOfMemoryException
。这将导致之前插入的 3000 万行数据丢失。
有关数据丢失的详细信息可在此处找到:https://groups.google.com/forum/#!topic/imply-user-group/95xpYojxiOg
Druid streaming window周期很短(10分钟)。在此期间之外,您的活动将被忽略。
这通常发生在您发送的数据超出 window 时间段时。如果您手动插入数据,请以毫秒为单位提供准确的当前时间戳 (UTC)。否则,如果您使用任何脚本生成数据,则可以轻松完成。确保它是 UTC 当前时间。
当您获得 {"result":{"received":1,"sent":0}} 时,您的工作线程工作正常。宁静根据与数据关联的时间戳来决定将哪些数据发送给德鲁伊。
这个周期由"windowPeriod"配置决定。因此,如果您的类型是实时的 ("type":"realtime") 并且 window 周期是 PT10M ("windowPeriod" : "PT10M"),那么 tranquility 将在 t- 10, t+10 在此期间之外不发送任何东西。
我不同意插入效率问题,自 2016 年 6 月以来,我们每 15 分钟发送 300 万行并且 运行 非常漂亮。当然,我们拥有更强大的基础设施来满足规模。
不插入的另一个原因是 coordinador/overloard 上的内存不足 运行
按照 http://druid.io/docs/latest/tutorials/tutorial-loading-streaming-data.html 上的教程,我能够通过 Kafka 控制台向德鲁伊插入数据
卡夫卡控制台
规范文件如下所示
examples/indexing/wikipedia.spec
[
{
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
}
}
]
我通过
启动实时java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime
在 Kafka 控制台中,我粘贴并输入以下内容
{"timestamp": "2013-08-10T01:02:33Z", "page": "Good Bye", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
然后我倾向于通过创建 select.json
和 运行 curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json
select.json
{
"queryType": "select",
"dataSource": "wikipedia",
"dimensions":[],
"metrics":[],
"granularity": "all",
"intervals": [
"2000-01-01/2020-01-02"
],
"filter" : {"type":"and",
"fields" : [
{ "type": "selector", "dimension": "user", "value": "catty" }
]
},
"pagingSpec":{"pagingIdentifiers": {}, "threshold":500}
}
我得到了以下结果。
[ {
"timestamp" : "2013-08-10T01:02:33.000Z",
"result" : {
"pagingIdentifiers" : {
"wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z" : 0
},
"events" : [ {
"segmentId" : "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z",
"offset" : 0,
"event" : {
"timestamp" : "2013-08-10T01:02:33.000Z",
"continent" : "North America",
"robot" : "false",
"country" : "United States",
"city" : "San Francisco",
"newPage" : "true",
"unpatrolled" : "true",
"namespace" : "article",
"anonymous" : "false",
"language" : "en",
"page" : "Good Bye",
"region" : "Bay Area",
"user" : "catty",
"deleted" : 200.0,
"added" : 57.0,
"count" : 1,
"delta" : -143.0
}
} ]
}
} ]
看来我已经正确设置了 Druid。
现在,我想通过 HTTP 端点插入数据。根据 How realtime data input to Druid?,推荐的方法似乎是使用 tranquility
宁静
我已经通过
启动了索引服务java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/overlord:lib/*: io.druid.cli.Main server overlord
conf/server.json 长得像
{
"dataSources" : [
{
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"tuningConfig" : {
"windowPeriod" : "PT10M",
"type" : "realtime",
"intermediatePersistPeriod" : "PT10M",
"maxRowsInMemory" : "100000"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1"
}
}
],
"properties" : {
"zookeeper.connect" : "localhost",
"http.port" : "8200",
"http.threads" : "8"
}
}
然后,我使用
启动服务器bin/tranquility server -configFile conf/server.json
我执行 post 到 http://xx.xxx.xxx.xxx:8200/v1/post/wikipedia,content-type
等于 application/json
{"timestamp": "2013-08-10T01:02:33Z", "page": "Selamat Pagi", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
我得到以下响应
{"result":{"received":1,"sent":0}}
看来宁静收到了我们的数据,但是发送给druid失败了!
我尝试 运行 curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json
,但没有得到我通过 tranquility 插入的输出。
知道为什么吗?谢谢。
设置德鲁伊以正确处理实时数据插入极其困难。
我发现最好的选择是使用 https://github.com/implydata 。 Imply 是一组围绕德鲁伊的包装器,使其易于使用。
但是imply的实时插入也不是很完美。在通过实时插入 3000 万个项目后,我进行了实验 OutOfMemoryException
。这将导致之前插入的 3000 万行数据丢失。
有关数据丢失的详细信息可在此处找到:https://groups.google.com/forum/#!topic/imply-user-group/95xpYojxiOg
Druid streaming window周期很短(10分钟)。在此期间之外,您的活动将被忽略。
这通常发生在您发送的数据超出 window 时间段时。如果您手动插入数据,请以毫秒为单位提供准确的当前时间戳 (UTC)。否则,如果您使用任何脚本生成数据,则可以轻松完成。确保它是 UTC 当前时间。
当您获得 {"result":{"received":1,"sent":0}} 时,您的工作线程工作正常。宁静根据与数据关联的时间戳来决定将哪些数据发送给德鲁伊。
这个周期由"windowPeriod"配置决定。因此,如果您的类型是实时的 ("type":"realtime") 并且 window 周期是 PT10M ("windowPeriod" : "PT10M"),那么 tranquility 将在 t- 10, t+10 在此期间之外不发送任何东西。
我不同意插入效率问题,自 2016 年 6 月以来,我们每 15 分钟发送 300 万行并且 运行 非常漂亮。当然,我们拥有更强大的基础设施来满足规模。
不插入的另一个原因是 coordinador/overloard 上的内存不足 运行