在主题之间过滤
Filtering between topics
我在一个主题中有 1,000 条记录。我正在尝试根据薪水将记录从输入主题过滤到输出主题。
例如:我想要工资高于30000的人的记录
为此,我正在尝试使用 Java 使用 KSTREAMS。
记录为文本格式(逗号分隔),示例:
first_name, last_name, email, gender, ip_address, country, salary
Redacted,Tranfield,user@example.com,Female,45.25.XXX.XXX,Russia,345.01
Redacted,Merck,user@example.com,Male,236.224.XXX.XXX,Belarus,321.96
Redacted,Kopisch,user@example.com,Male,61.36.XXX.XXX,Morocco,345.05
Redacted,Edds,user@example.com,Male,6.87.XXX.XXX,Poland,321.72
Redacted,Alston,user@example.com,Female,56.146.XXX.XXX,Indonesia,345.16
...
这是我的代码:
public class StreamsStartApp {
public static void main(String[] args) {
System.out.println();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Stream from Kafka topic
KStream<Long, Long> newInput = builder.stream("word-count-input");
Stream<Long, Long> usersAndColours = newInput
// step 1 - we ensure that a comma is here as we will split on it
.filter(value -> value.contains(",")
// step 2 - we select a key that will be the user id
.selectKey((key, value) -> value.split(",")[6])
// step 3 - got stuck here.
// .filter(key -> key.value[6] > 30000
// .selectKey((new1, value1) -> value1.split)(",")[3])
// .filter((key, value) -> key.greater(10));
// .filter((key, value) -> key > 10);
// .filter(key -> key.getkey().intValue() > 10);
usersAndColours.to("new-output");
Runtime.getRuntime().addShutdownHook(new Thread(streams::close))
在靠近步骤 1 的上述代码中,我使用“,”分隔样本数据。
在第 2 步中,我选择了一个字段,即:工资字段作为键。
现在在第 3 步中,我正在尝试使用 salary 字段过滤数据。
我尝试了一些评论的方法,但没有任何效果。
任何想法都会有所帮助。
首先,你的key和value都是String serdes,不是Longs,所以KStream<Long, Long>
是不正确的。
而 value.split(",")[6]
只是一个字符串,而不是一个双精度数。 (或 Long,因为有小数值)
您需要从您的列中删除 $
并将字符串解析为 Double,然后您可以对其进行过滤。它也不是 key.value[6]
因为您的键不是具有值字段的对象。
而且你应该将电子邮件作为密钥,而不是薪水,如果你甚至需要密钥,那就是
实际上,您可以在一行中完成此操作(为了便于阅读,此处分为两行)
newInput.filter(value -> value.contains(",") &&
Double.parseDouble(value.split(",")[6].replace("$", "")) > 30000);
我在一个主题中有 1,000 条记录。我正在尝试根据薪水将记录从输入主题过滤到输出主题。
例如:我想要工资高于30000的人的记录
为此,我正在尝试使用 Java 使用 KSTREAMS。
记录为文本格式(逗号分隔),示例:
first_name, last_name, email, gender, ip_address, country, salary
Redacted,Tranfield,user@example.com,Female,45.25.XXX.XXX,Russia,345.01
Redacted,Merck,user@example.com,Male,236.224.XXX.XXX,Belarus,321.96
Redacted,Kopisch,user@example.com,Male,61.36.XXX.XXX,Morocco,345.05
Redacted,Edds,user@example.com,Male,6.87.XXX.XXX,Poland,321.72
Redacted,Alston,user@example.com,Female,56.146.XXX.XXX,Indonesia,345.16
...
这是我的代码:
public class StreamsStartApp {
public static void main(String[] args) {
System.out.println();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Stream from Kafka topic
KStream<Long, Long> newInput = builder.stream("word-count-input");
Stream<Long, Long> usersAndColours = newInput
// step 1 - we ensure that a comma is here as we will split on it
.filter(value -> value.contains(",")
// step 2 - we select a key that will be the user id
.selectKey((key, value) -> value.split(",")[6])
// step 3 - got stuck here.
// .filter(key -> key.value[6] > 30000
// .selectKey((new1, value1) -> value1.split)(",")[3])
// .filter((key, value) -> key.greater(10));
// .filter((key, value) -> key > 10);
// .filter(key -> key.getkey().intValue() > 10);
usersAndColours.to("new-output");
Runtime.getRuntime().addShutdownHook(new Thread(streams::close))
在靠近步骤 1 的上述代码中,我使用“,”分隔样本数据。
在第 2 步中,我选择了一个字段,即:工资字段作为键。
现在在第 3 步中,我正在尝试使用 salary 字段过滤数据。
我尝试了一些评论的方法,但没有任何效果。
任何想法都会有所帮助。
首先,你的key和value都是String serdes,不是Longs,所以KStream<Long, Long>
是不正确的。
而 value.split(",")[6]
只是一个字符串,而不是一个双精度数。 (或 Long,因为有小数值)
您需要从您的列中删除 $
并将字符串解析为 Double,然后您可以对其进行过滤。它也不是 key.value[6]
因为您的键不是具有值字段的对象。
而且你应该将电子邮件作为密钥,而不是薪水,如果你甚至需要密钥,那就是
实际上,您可以在一行中完成此操作(为了便于阅读,此处分为两行)
newInput.filter(value -> value.contains(",") &&
Double.parseDouble(value.split(",")[6].replace("$", "")) > 30000);