Flink DataStream 将对象映射到对象列表
Flink DataStream map Object to list of Objects
我正在尝试将对象 A 的数据流转换为对象 B 的列表。如下例所示,我正在从 flink 消费者读取数据流,我需要转换为数据流,这样我就可以 运行 在 MappedMetric 对象上随 timeWindow 进行一些过滤器和聚合。单个 LogEvent 可能会生成 MappedMetric 对象列表,因此如果我使用 MapFunction,结果将是 DataStream。但是,我认为聚合不能在 DataStream 上 运行。非常感谢任何帮助。提前致谢。
// Input Object
public class LogEvent {
private String id;
private long timestamp;
private List<LogMessage> message;
}
public class LogMessage {
private String accountId;
private List<Metric> metrics;
}
public class Metric {
private String name;
private double value;
}
// Should be transformed to
public class MappedMetric {
private String accountId;
private String name;
private double value;
private long timestamp;
}
final DataStream<LogEvent> inputDataStream = **read from Flink consumer**
final DataStream<MappedMetric> aggregatedMetrics = inputDataStream
.map(**SomeMapFunction**)
.keyBy(**SomeKey**)
您想使用 FlatMap
函数,它可以为单个输入生成多个结果。每个结果都是一个 MappedMetric
记录,而不是一个列表。
我正在尝试将对象 A 的数据流转换为对象 B 的列表。如下例所示,我正在从 flink 消费者读取数据流,我需要转换为数据流,这样我就可以 运行 在 MappedMetric 对象上随 timeWindow 进行一些过滤器和聚合。单个 LogEvent 可能会生成 MappedMetric 对象列表,因此如果我使用 MapFunction,结果将是 DataStream。但是,我认为聚合不能在 DataStream
上 运行。非常感谢任何帮助。提前致谢。
// Input Object
public class LogEvent {
private String id;
private long timestamp;
private List<LogMessage> message;
}
public class LogMessage {
private String accountId;
private List<Metric> metrics;
}
public class Metric {
private String name;
private double value;
}
// Should be transformed to
public class MappedMetric {
private String accountId;
private String name;
private double value;
private long timestamp;
}
final DataStream<LogEvent> inputDataStream = **read from Flink consumer**
final DataStream<MappedMetric> aggregatedMetrics = inputDataStream
.map(**SomeMapFunction**)
.keyBy(**SomeKey**)
您想使用 FlatMap
函数,它可以为单个输入生成多个结果。每个结果都是一个 MappedMetric
记录,而不是一个列表。