Json 列作为kafka producer中的key,并根据key推入不同的分区
Json column as key in kafka producer and push in different partitions on the basis of key
正如我们所知,我们可以向 kafka 生产者发送一个密钥,该密钥在内部进行哈希处理,以查找主题数据中的哪个分区。我有一个生产者,我在其中以 JSON 格式发送数据。
[
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 07:50:42",
"TIME": 75042,
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:02:26",
"TIME": 80226
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:39:55",
"TIME": 83955
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:43:26",
"TIME": 84326
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:44:22",
"TIME": 84422
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:45:09",
"TIME": 84509
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
}
]
我想在key(DEVICEID) 的基础上将数据推送到不同分区的topic 中。
我创建了包含两个分区 0 和 1 的主题。但它将所有数据存储在分区 0 中。我希望所有唯一密钥(设备 ID)都存储在不同的分区中。代码:
object Producer extends App{
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
val producer = new KafkaProducer[String,JsonNode](props)
println("inside prducer")
val mapper = (new ObjectMapper() with ScalaObjectMapper).
registerModule(DefaultScalaModule).
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
findAndRegisterModules(). // register joda and java-time modules automatically
asInstanceOf[ObjectMapper with ScalaObjectMapper]
val filename = "/Users/rishunigam/Documents/devicd.json"
val jsonNode: JsonNode= mapper.readTree(new File(filename))
val s = jsonNode.size()
for(i <- 0 to jsonNode.size()-1) {
val js = jsonNode.get(i)
val keys = jsonNode.get(i).findValue("DEVICEID").toString
println(keys)
println(js)
val record = new ProducerRecord[String,JsonNode]( "tpch.devices_logs",keys,js)
println(record)
producer.send(record)
}
println("producer complete")
producer.close()
}
it stored all the data in partition-0
这并不意味着它不起作用。只是意味着键的散列最终在同一个分区中。
如果要覆盖默认的分区器,需要定义自己的分区器class来解析消息并分配合适的分区,然后在Producer属性中设置partitioner.class
I want all unique key(deviceID) will store in different partition
那么你必须提前知道你的竞争数据集才能为 N 台设备创建 N 个分区。当您添加一个全新的设备时会发生什么?
正如我们所知,我们可以向 kafka 生产者发送一个密钥,该密钥在内部进行哈希处理,以查找主题数据中的哪个分区。我有一个生产者,我在其中以 JSON 格式发送数据。
[
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 07:50:42",
"TIME": 75042,
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:02:26",
"TIME": 80226
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:39:55",
"TIME": 83955
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:43:26",
"TIME": 84326
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:44:22",
"TIME": 84422
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:45:09",
"TIME": 84509
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
}
]
我想在key(DEVICEID) 的基础上将数据推送到不同分区的topic 中。 我创建了包含两个分区 0 和 1 的主题。但它将所有数据存储在分区 0 中。我希望所有唯一密钥(设备 ID)都存储在不同的分区中。代码:
object Producer extends App{
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
val producer = new KafkaProducer[String,JsonNode](props)
println("inside prducer")
val mapper = (new ObjectMapper() with ScalaObjectMapper).
registerModule(DefaultScalaModule).
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
findAndRegisterModules(). // register joda and java-time modules automatically
asInstanceOf[ObjectMapper with ScalaObjectMapper]
val filename = "/Users/rishunigam/Documents/devicd.json"
val jsonNode: JsonNode= mapper.readTree(new File(filename))
val s = jsonNode.size()
for(i <- 0 to jsonNode.size()-1) {
val js = jsonNode.get(i)
val keys = jsonNode.get(i).findValue("DEVICEID").toString
println(keys)
println(js)
val record = new ProducerRecord[String,JsonNode]( "tpch.devices_logs",keys,js)
println(record)
producer.send(record)
}
println("producer complete")
producer.close()
}
it stored all the data in partition-0
这并不意味着它不起作用。只是意味着键的散列最终在同一个分区中。
如果要覆盖默认的分区器,需要定义自己的分区器class来解析消息并分配合适的分区,然后在Producer属性中设置partitioner.class
I want all unique key(deviceID) will store in different partition
那么你必须提前知道你的竞争数据集才能为 N 台设备创建 N 个分区。当您添加一个全新的设备时会发生什么?