时间序列处理的Spark Streaming(按时间间隔划分数据)
Spark streaming for times series processing (divide data by time interval)
我从UDP socket中获取数据流(nginx在线日志),数据结构是:
date | ip | mac | objectName | rate | size
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 | book1 | 10 | 121
2016-04-05 11:17:34 | 10.0.0.2 | a5:a8 | book2351 | 8 | 2342
2016-04-05 11:17:34 | 10.0.0.3 | d1:b56| bookA5 | 10 | 12
2016-04-05 11:17:35 | 10.0.0.1 | e1:e2 | book67 | 10 | 768
2016-04-05 11:17:35 | 10.0.0.2 | a5:a8 | book2351 | 8 | 897
2016-04-05 11:17:35 | 10.0.0.3 | d1:b56| bookA5 | 9 | 34
2016-04-05 11:17:35 | 10.0.0.4 | c7:c2 | book99 | 9 | 924
...
2016-04-05 11:18:01 | 10.0.0.1 | e1:e2 | book-10 | 8 | 547547
2016-04-05 11:18:17 | 10.0.0.4 | c7:c2 | book99 | 10 | 23423
2016-04-05 11:18:18 | 10.0.0.3 | d1:b56| bookA5 | 10 | 1138
我必须:
- 聚合数据,按分钟划分 - 一个结果行(分钟,ip,mac)
- objectName - 可以在分钟内更改,我必须取第一个,即
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
book1
已更改为 book67
,因此必须是 book1
- rate - 分钟内的变化率计数
- size - 大小之间的差异(分区内的先前时间,分区内的当前时间),即
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
= ... 768 - 121
所以,结果(不计算大小):
date | ip | mac | objectName | changes | size
2016-04-05 11:17:00 | 10.0.0.1 | e1:e2 | book1 | 0 | 768 - 121
2016-04-05 11:17:00 | 10.0.0.2 | a5:a8 | book2351 | 0 | 897 - 2342
2016-04-05 11:17:00 | 10.0.0.3 | d1:b56| bookA5 | 1 | 34 - 12
2016-04-05 11:17:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 924
...
2016-04-05 11:18:00 | 10.0.0.1 | e1:e2 | book-10 | 0 | 547547
2016-04-05 11:18:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 23423
2016-04-05 11:18:00 | 10.0.0.3 | d1:b56| bookA5 | 0 | 1138
这里是我的代码快照,我知道 updateStateByKey
和 window
但我不能特别理解,如何将数据刷新到数据库或文件系统,当周期(分钟)更改时:
private static final Duration SLIDE_INTERVAL = Durations.seconds(10);
private static final String nginxLogHost = "localhost";
private static final int nginxLogPort = 9999;
private class Raw {
LocalDate time; // full time with seconds
String ip;
String mac;
String objectName;
int rate;
int size;
}
private class Key {
LocalDate time; // time with 00 seconds
String ip;
String mac;
}
private class RawValue {
LocalDate time; // full time with seconds
String objectName;
int rate;
int size;
}
private class Value {
String objectName;
int changes;
int size;
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("TestNginxLog");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaStreamingContext jssc = new JavaStreamingContext(conf, SLIDE_INTERVAL);
jssc.checkpoint("/tmp");
JavaReceiverInputDStream<Raw> logRecords = jssc.receiverStream(new NginxUDPReceiver(nginxLogHost, nginxLogPort));
PairFunction<Raw, Key, RawValue> pairFunction = (PairFunction<Raw, Key, RawValue>) rawLine -> {
LocalDateTime time = rawLine.getDateTime();
Key k = new Key(LocalTime.of(time.getHour(), time.getMinute()), rawLine.getIp(), rawLine.getMac());
RawValue v = new RawValue(time, rawLine.getObjectName(), rawLine.getRate(), rawLine.getSize());
return new Tuple2<>(k, v);
};
JavaPairDStream<Key, RawValue> logDStream = logRecords.mapToPair(pairFunction);
这是部分答案,但问题尚未完成。在 mapToPair
之后我使用:
// 1 key - N values
JavaPairDStream<Key, Iterable<Value>> abonentConnects = logDStream.groupByKey();
// Accumulate data
Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>> updateFunc = (Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>>) (values, previousState) -> {
List<Value> sum = previousState.or(new ArrayList<>());
for (Iterable<Value> v : values) {
v.forEach(sum::add);
}
return Optional.of(sum);
};
JavaPairDStream<Key, List<Value>> state = abonentConnects.updateStateByKey(updateFunc);
// filter data (previous minute)
Function<Tuple2<Key, List<Value>>, Boolean> filterFunc = (Function<Tuple2<Key, List<Value>>, Boolean>) v1 -> {
LocalDateTime previousTime = LocalDateTime.now().minusMinutes(1).withSecond(0).withNano(0);
LocalDateTime valueTime = v1._1().getTime();
return valueTime.compareTo(previousTime) == 0;
};
JavaPairDStream<Key, List<Value>> filteredRecords = state.filter(filterFunc);
// save data
filteredRecords.foreachRDD(x -> {
if (x.count() > 0) {
x.saveAsTextFile("/tmp/xxx/grouped/" + LocalDateTime.now().toString().replace(":", "-").replace(".", "-"));
}
});
streamingContext.start();
streamingContext.awaitTermination();
作为结果数据产生,但由于操作每 5 秒执行一次,我每 5 秒得到相同的重复数据。
我知道,我必须使用 Optional.absent()
从流中清除保存的数据。我试过使用它,但无法合并成一个片段:将数据保存到文件系统或 HashMap |立即清除保存的数据。
问题:我该怎么做?
所以,我将通过自己的回答结束这个问题。您可以使用此函数示例作为 updateStateByKey
的参数。这段代码中的线索词是:Optional.absent()
清除已经保存的数据,Optional.of(...
对数据进行分组,setAggregateReady(true)
.
最后一个用于通过过滤器 getAggregateReady(true)
和一些 Spark Streaming 输出操作将数据保存到外部目标(DB 或文件系统),例如 foreachRDD
.
之后下一批数据落入updateStateByKey
,将被代码removeIf(T::isAggregateReady)
.
剔除
/**
* It aggregates data between batches.
* <p>
* currentBatchValues values that was got in current batch
* previousBatchesState values that was got in all previous batches
* You have to clear data (return for them Optional.absent()) to eliminate them from DStream.
* First batch: data checked for aggregateReady.
* Second batch: data, signed aggregateReady=true removes from DStream (you have to save them to DB or another target before this cleaning)
*/
protected Function2<List<Iterable<T>>, Optional<List<T>>, Optional<List<T>>> updateDataRowsFunc = (currentBatchValues, previousBatchesState) -> {
Optional<List<T>> res;
//log.debug("previousBatchesState isPresent {}", previousBatchesState.isPresent());
//log.debug("previousBatchesState {}", previousBatchesState);
//log.debug("currentBatchValues isEmpty {}", currentBatchValues.isEmpty());
//log.debug("currentBatchValues {}", currentBatchValues);
// previous data that was aggregateReady already saved
if (previousBatchesState.isPresent()) {
log.debug("count before remove = {}", previousBatchesState.get().size());
previousBatchesState.get().removeIf(T::isAggregateReady);
// absent previous state if all of it's data was aggregated already
int cntBefore = previousBatchesState.get().size();
if (cntBefore == 0) previousBatchesState = Optional.absent();
}
// warn: can't bear comparator outside, for the reason that error "Task can'not serializable"
Comparator<T> dataRowByAggGroupComparator = (o1, o2) -> o1.getAggregateGroup().compareTo(o2.getAggregateGroup());
// no data was collected at previous batches && data exists in current batch
if (!previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) {
log.debug("algorithm 1");
// list currentBatchValues contains only 1 value (1-to-N NginxDataRow records), so we getAllJsonFilesInFolder it Iterable and convert to List
// warn: may be another way to compare Iterable elements, without using List
List<T> listDataRow = new ArrayList<>();
currentBatchValues.get(0).forEach(listDataRow::add);
// in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups
LocalDateTime minAggGroup = listDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup();
LocalDateTime maxAggGroup = listDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup();
// batch was split between groups
if (!minAggGroup.equals(maxAggGroup)) {
log.debug("batch was split between groups {} and {}", minAggGroup, maxAggGroup);
// set ready to aggregate for previous group of data, because aggregate group was changed
listDataRow.stream().filter(z -> z.getAggregateGroup().equals(minAggGroup)).forEach(z -> z.setAggregateReady(true));
}
res = Optional.of(listDataRow);
//log.debug("agg res = {}", res);
// data exist in both: previous and current batches
} else if (previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) {
log.debug("algorithm 2");
List<T> listCurrentBatchDataRow = new ArrayList<>();
currentBatchValues.get(0).forEach(listCurrentBatchDataRow::add);
LocalDateTime previousBatchAggGroup = previousBatchesState.get().stream().findFirst().get().getAggregateGroup();
// in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups
LocalDateTime minCurrentBatchAggGroup = listCurrentBatchDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup();
LocalDateTime maxCurrentBatchAggGroup = listCurrentBatchDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup();
// previous and current data in different groups
if (!previousBatchAggGroup.equals(maxCurrentBatchAggGroup)) {
log.debug("previous batch needed to save, because agg group was changed from {} to {}", previousBatchAggGroup, maxCurrentBatchAggGroup);
// set ready to aggregate for previous group of data, because aggregate group was changed
previousBatchesState.get().stream().forEach(z -> z.setAggregateReady(true));
// batch was split between groups
if (!minCurrentBatchAggGroup.equals(maxCurrentBatchAggGroup)) {
log.debug("batch was split between groups {} and {}", minCurrentBatchAggGroup, maxCurrentBatchAggGroup);
listCurrentBatchDataRow.stream().filter(z -> z.getAggregateGroup().equals(minCurrentBatchAggGroup)).forEach(z -> z.setAggregateReady(true));
}
}
// union previous and current batches data
previousBatchesState.get().addAll(listCurrentBatchDataRow);
res = Optional.of(previousBatchesState.get());
//log.debug("agg res = {}", res);
// data exist in previous batch but current batch is empty
} else if (previousBatchesState.isPresent() && currentBatchValues.isEmpty()) {
log.debug("algorithm 3");
res = previousBatchesState;
//log.debug("agg res = {}", res);
// all of previous data was aggregated and absent() already
} else if (!previousBatchesState.isPresent() && currentBatchValues.isEmpty()) {
log.debug("algorithm 4");
res = Optional.absent();
} else {
log.error("Strange situation, you have to check log-file");
res = null;
}
// if abonent data was received in one minute and after abonent shut down connection, they will stay in DStream forever
// after some period forced to save them
if (res != null && res.isPresent()) {
res.get().stream().filter(z -> Math.abs(java.time.Duration.between(z.getAggregateGroup(), LocalDateTime.now()).getSeconds() / 60) > FORCED_SAVE_INTERVAL).forEach(z -> z.setAggregateReady(true));
}
return res;
};
我从UDP socket中获取数据流(nginx在线日志),数据结构是:
date | ip | mac | objectName | rate | size
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 | book1 | 10 | 121
2016-04-05 11:17:34 | 10.0.0.2 | a5:a8 | book2351 | 8 | 2342
2016-04-05 11:17:34 | 10.0.0.3 | d1:b56| bookA5 | 10 | 12
2016-04-05 11:17:35 | 10.0.0.1 | e1:e2 | book67 | 10 | 768
2016-04-05 11:17:35 | 10.0.0.2 | a5:a8 | book2351 | 8 | 897
2016-04-05 11:17:35 | 10.0.0.3 | d1:b56| bookA5 | 9 | 34
2016-04-05 11:17:35 | 10.0.0.4 | c7:c2 | book99 | 9 | 924
...
2016-04-05 11:18:01 | 10.0.0.1 | e1:e2 | book-10 | 8 | 547547
2016-04-05 11:18:17 | 10.0.0.4 | c7:c2 | book99 | 10 | 23423
2016-04-05 11:18:18 | 10.0.0.3 | d1:b56| bookA5 | 10 | 1138
我必须:
- 聚合数据,按分钟划分 - 一个结果行(分钟,ip,mac)
- objectName - 可以在分钟内更改,我必须取第一个,即
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
book1
已更改为book67
,因此必须是book1
- rate - 分钟内的变化率计数
- size - 大小之间的差异(分区内的先前时间,分区内的当前时间),即
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
= ... 768 - 121
所以,结果(不计算大小):
date | ip | mac | objectName | changes | size
2016-04-05 11:17:00 | 10.0.0.1 | e1:e2 | book1 | 0 | 768 - 121
2016-04-05 11:17:00 | 10.0.0.2 | a5:a8 | book2351 | 0 | 897 - 2342
2016-04-05 11:17:00 | 10.0.0.3 | d1:b56| bookA5 | 1 | 34 - 12
2016-04-05 11:17:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 924
...
2016-04-05 11:18:00 | 10.0.0.1 | e1:e2 | book-10 | 0 | 547547
2016-04-05 11:18:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 23423
2016-04-05 11:18:00 | 10.0.0.3 | d1:b56| bookA5 | 0 | 1138
这里是我的代码快照,我知道 updateStateByKey
和 window
但我不能特别理解,如何将数据刷新到数据库或文件系统,当周期(分钟)更改时:
private static final Duration SLIDE_INTERVAL = Durations.seconds(10);
private static final String nginxLogHost = "localhost";
private static final int nginxLogPort = 9999;
private class Raw {
LocalDate time; // full time with seconds
String ip;
String mac;
String objectName;
int rate;
int size;
}
private class Key {
LocalDate time; // time with 00 seconds
String ip;
String mac;
}
private class RawValue {
LocalDate time; // full time with seconds
String objectName;
int rate;
int size;
}
private class Value {
String objectName;
int changes;
int size;
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("TestNginxLog");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaStreamingContext jssc = new JavaStreamingContext(conf, SLIDE_INTERVAL);
jssc.checkpoint("/tmp");
JavaReceiverInputDStream<Raw> logRecords = jssc.receiverStream(new NginxUDPReceiver(nginxLogHost, nginxLogPort));
PairFunction<Raw, Key, RawValue> pairFunction = (PairFunction<Raw, Key, RawValue>) rawLine -> {
LocalDateTime time = rawLine.getDateTime();
Key k = new Key(LocalTime.of(time.getHour(), time.getMinute()), rawLine.getIp(), rawLine.getMac());
RawValue v = new RawValue(time, rawLine.getObjectName(), rawLine.getRate(), rawLine.getSize());
return new Tuple2<>(k, v);
};
JavaPairDStream<Key, RawValue> logDStream = logRecords.mapToPair(pairFunction);
这是部分答案,但问题尚未完成。在 mapToPair
之后我使用:
// 1 key - N values
JavaPairDStream<Key, Iterable<Value>> abonentConnects = logDStream.groupByKey();
// Accumulate data
Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>> updateFunc = (Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>>) (values, previousState) -> {
List<Value> sum = previousState.or(new ArrayList<>());
for (Iterable<Value> v : values) {
v.forEach(sum::add);
}
return Optional.of(sum);
};
JavaPairDStream<Key, List<Value>> state = abonentConnects.updateStateByKey(updateFunc);
// filter data (previous minute)
Function<Tuple2<Key, List<Value>>, Boolean> filterFunc = (Function<Tuple2<Key, List<Value>>, Boolean>) v1 -> {
LocalDateTime previousTime = LocalDateTime.now().minusMinutes(1).withSecond(0).withNano(0);
LocalDateTime valueTime = v1._1().getTime();
return valueTime.compareTo(previousTime) == 0;
};
JavaPairDStream<Key, List<Value>> filteredRecords = state.filter(filterFunc);
// save data
filteredRecords.foreachRDD(x -> {
if (x.count() > 0) {
x.saveAsTextFile("/tmp/xxx/grouped/" + LocalDateTime.now().toString().replace(":", "-").replace(".", "-"));
}
});
streamingContext.start();
streamingContext.awaitTermination();
作为结果数据产生,但由于操作每 5 秒执行一次,我每 5 秒得到相同的重复数据。
我知道,我必须使用 Optional.absent()
从流中清除保存的数据。我试过使用它,但无法合并成一个片段:将数据保存到文件系统或 HashMap |立即清除保存的数据。
问题:我该怎么做?
所以,我将通过自己的回答结束这个问题。您可以使用此函数示例作为 updateStateByKey
的参数。这段代码中的线索词是:Optional.absent()
清除已经保存的数据,Optional.of(...
对数据进行分组,setAggregateReady(true)
.
最后一个用于通过过滤器 getAggregateReady(true)
和一些 Spark Streaming 输出操作将数据保存到外部目标(DB 或文件系统),例如 foreachRDD
.
之后下一批数据落入updateStateByKey
,将被代码removeIf(T::isAggregateReady)
.
/**
* It aggregates data between batches.
* <p>
* currentBatchValues values that was got in current batch
* previousBatchesState values that was got in all previous batches
* You have to clear data (return for them Optional.absent()) to eliminate them from DStream.
* First batch: data checked for aggregateReady.
* Second batch: data, signed aggregateReady=true removes from DStream (you have to save them to DB or another target before this cleaning)
*/
protected Function2<List<Iterable<T>>, Optional<List<T>>, Optional<List<T>>> updateDataRowsFunc = (currentBatchValues, previousBatchesState) -> {
Optional<List<T>> res;
//log.debug("previousBatchesState isPresent {}", previousBatchesState.isPresent());
//log.debug("previousBatchesState {}", previousBatchesState);
//log.debug("currentBatchValues isEmpty {}", currentBatchValues.isEmpty());
//log.debug("currentBatchValues {}", currentBatchValues);
// previous data that was aggregateReady already saved
if (previousBatchesState.isPresent()) {
log.debug("count before remove = {}", previousBatchesState.get().size());
previousBatchesState.get().removeIf(T::isAggregateReady);
// absent previous state if all of it's data was aggregated already
int cntBefore = previousBatchesState.get().size();
if (cntBefore == 0) previousBatchesState = Optional.absent();
}
// warn: can't bear comparator outside, for the reason that error "Task can'not serializable"
Comparator<T> dataRowByAggGroupComparator = (o1, o2) -> o1.getAggregateGroup().compareTo(o2.getAggregateGroup());
// no data was collected at previous batches && data exists in current batch
if (!previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) {
log.debug("algorithm 1");
// list currentBatchValues contains only 1 value (1-to-N NginxDataRow records), so we getAllJsonFilesInFolder it Iterable and convert to List
// warn: may be another way to compare Iterable elements, without using List
List<T> listDataRow = new ArrayList<>();
currentBatchValues.get(0).forEach(listDataRow::add);
// in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups
LocalDateTime minAggGroup = listDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup();
LocalDateTime maxAggGroup = listDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup();
// batch was split between groups
if (!minAggGroup.equals(maxAggGroup)) {
log.debug("batch was split between groups {} and {}", minAggGroup, maxAggGroup);
// set ready to aggregate for previous group of data, because aggregate group was changed
listDataRow.stream().filter(z -> z.getAggregateGroup().equals(minAggGroup)).forEach(z -> z.setAggregateReady(true));
}
res = Optional.of(listDataRow);
//log.debug("agg res = {}", res);
// data exist in both: previous and current batches
} else if (previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) {
log.debug("algorithm 2");
List<T> listCurrentBatchDataRow = new ArrayList<>();
currentBatchValues.get(0).forEach(listCurrentBatchDataRow::add);
LocalDateTime previousBatchAggGroup = previousBatchesState.get().stream().findFirst().get().getAggregateGroup();
// in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups
LocalDateTime minCurrentBatchAggGroup = listCurrentBatchDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup();
LocalDateTime maxCurrentBatchAggGroup = listCurrentBatchDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup();
// previous and current data in different groups
if (!previousBatchAggGroup.equals(maxCurrentBatchAggGroup)) {
log.debug("previous batch needed to save, because agg group was changed from {} to {}", previousBatchAggGroup, maxCurrentBatchAggGroup);
// set ready to aggregate for previous group of data, because aggregate group was changed
previousBatchesState.get().stream().forEach(z -> z.setAggregateReady(true));
// batch was split between groups
if (!minCurrentBatchAggGroup.equals(maxCurrentBatchAggGroup)) {
log.debug("batch was split between groups {} and {}", minCurrentBatchAggGroup, maxCurrentBatchAggGroup);
listCurrentBatchDataRow.stream().filter(z -> z.getAggregateGroup().equals(minCurrentBatchAggGroup)).forEach(z -> z.setAggregateReady(true));
}
}
// union previous and current batches data
previousBatchesState.get().addAll(listCurrentBatchDataRow);
res = Optional.of(previousBatchesState.get());
//log.debug("agg res = {}", res);
// data exist in previous batch but current batch is empty
} else if (previousBatchesState.isPresent() && currentBatchValues.isEmpty()) {
log.debug("algorithm 3");
res = previousBatchesState;
//log.debug("agg res = {}", res);
// all of previous data was aggregated and absent() already
} else if (!previousBatchesState.isPresent() && currentBatchValues.isEmpty()) {
log.debug("algorithm 4");
res = Optional.absent();
} else {
log.error("Strange situation, you have to check log-file");
res = null;
}
// if abonent data was received in one minute and after abonent shut down connection, they will stay in DStream forever
// after some period forced to save them
if (res != null && res.isPresent()) {
res.get().stream().filter(z -> Math.abs(java.time.Duration.between(z.getAggregateGroup(), LocalDateTime.now()).getSeconds() / 60) > FORCED_SAVE_INTERVAL).forEach(z -> z.setAggregateReady(true));
}
return res;
};