如何提取一个函数,我可以将函数作为参数传递给 Java 8 中的这些链式 lambda?
How to extract a function where I can pass functions as parameters for these chained lambdas in Java 8?
我在一段使用 Kafka Streams 的代码中有一个代码模式不断重复,我做了一个映射,然后按键分组,然后减少。它看起来像这样:
KTable<ProjectKey, EventConfigurationIdsWithDeletedState> eventConfigurationsByProjectTable = eventConfigurationStream
.map((key, value) -> {
Map<String, Boolean> eventConfigurationUpdates = new HashMap<>();
eventConfigurationUpdates.put(key.getEventConfigurationId(), value != null);
ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
EventConfigurationIdsWithDeletedState eventConfigurationIdsWithDeletedState = EventConfigurationIdsWithDeletedState.newBuilder().setEventConfigurations(eventConfigurationUpdates).build();
return KeyValue.pair(projectKey, eventConfigurationIdsWithDeletedState);
})
.groupByKey()
.reduce((aggValue, newValue) -> {
Map<String, Boolean> newEventConfigurations = newValue.getEventConfigurations();
Map<String, Boolean> aggEventConfigurations = aggValue.getEventConfigurations();
Map.Entry<String, Boolean> newEntry = newEventConfigurations.entrySet().iterator().next();
if (newEntry.getValue())
aggEventConfigurations.putAll(newEventConfigurations);
else
aggEventConfigurations.remove(newEntry.getKey());
if (aggEventConfigurations.size() == 0)
return null;
return aggValue;
});
(eventConfigurationStream 的类型为 KStream<EventConfigurationKey, EventConfiguration>
)
遵循此模式的另一个示例。请注意,这里也有一个过滤器,但情况并非总是如此:
KTable<ProjectKey, NotificationSettingsTransition> globalNotificationSettingsPerProjectTable = notificationSettingTable.toStream()
.filter((key, value) -> {
return key.getEventConfigurationId() == null;
})
.map((key, value) -> {
ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
Map<String, NotificationSetting> notificationSettingsMap = new HashMap<>();
notificationSettingsMap.put(getAsCompoundKeyString(key), value);
NotificationSettingsTransition notificationSettingTransition = NotificationSettingsTransition
.newBuilder()
.setNotificationSettingCompoundKeyLastUpdate(getAsCompoundKey(key))
.setNotificationSettingLastUpdate(value)
.setEventConfigurationIds(new ArrayList<>())
.setNotificationSettingsMap(notificationSettingsMap)
.build();
return KeyValue.pair(projectKey, notificationSettingTransition);
})
.groupByKey()
.reduce((aggValue, newValue) -> {
Map<String, NotificationSetting> notificationSettingMap = aggValue.getNotificationSettingsMap();
String compoundKeyAsString = getAsString(newValue.getNotificationSettingCompoundKeyLastUpdate());
if (newValue.getNotificationSettingLastUpdate() != null)
notificationSettingMap.put(compoundKeyAsString, newValue.getNotificationSettingLastUpdate());
else
notificationSettingMap.remove(compoundKeyAsString);
aggValue.setNotificationSettingCompoundKeyLastUpdate(newValue.getNotificationSettingCompoundKeyLastUpdate());
aggValue.setNotificationSettingLastUpdate(newValue.getNotificationSettingLastUpdate());
aggValue.setNotificationSettingsMap(notificationSettingMap);
return aggValue;
});
(notificationSettingsTable 的类型为 KTable<NotificationSettingKey, NotificationSetting> notificationSettingTable
,但也立即转换为 KStream。)
我如何将其提取到一个函数中,在该函数中我为地图代码和 reduce 代码传递了一个函数,但不必重复 .map().groupByKey().reduce()
的模式?虽然 return 类型不同并且取决于 map 函数中的代码,但应保持类型化。理想情况下在 Java 8,但更高版本是可能的。当地图代码中 KeyValuePair
的内部类型不会改变时,我想我很清楚如何做到这一点,但现在不确定如何做到这一点。
您可以参数化您的函数以接受两个泛型函数,其中类型将在函数被调用时被推断(或者如果不可能则明确设置)。
对于 map
的输入,您需要 BiFunction<K, V, T>
,对于 reduce
,您需要 BiFunction<U, U, U>
,其中:
K
是map
函数中key
的类型。
V
是map
函数中value
的类型。
T
是 map
函数的 return 类型。
U
是聚合器的类型,值和 reduce
函数的 return 类型。
查看KStream
and KGroupedStream
,您可以获得更详细的类型信息以进一步约束函数。
这将使您的自定义函数变成这样:
<K, V, T, U> U mapGroupReduce(final KStream<K, V> stream, final BiFunction<K, V, T> mapper, final BiFunction<U, U, U> reducer) {
return stream.map(mapper).groupByKey().reduce(reducer);
}
然后你可以这样称呼它:
mapGroupReduce(yourStream,
(key, value) -> new KeyValue(k, v)),
(acc, value) -> acc);
在您的情况下,您需要使用:
而不是使用 BiFunction
s
KeyValueMapper<K, V, KeyValue<T, U>>
映射器
Reducer<U>
为减速器。
然而,这真的比每次都写 stream.map(M).groupByKey().reduce(R)
好得多吗?更详细的版本更明确,并且考虑到映射器和缩减器的相对大小,你并没有真正节省那么多。
我在一段使用 Kafka Streams 的代码中有一个代码模式不断重复,我做了一个映射,然后按键分组,然后减少。它看起来像这样:
KTable<ProjectKey, EventConfigurationIdsWithDeletedState> eventConfigurationsByProjectTable = eventConfigurationStream
.map((key, value) -> {
Map<String, Boolean> eventConfigurationUpdates = new HashMap<>();
eventConfigurationUpdates.put(key.getEventConfigurationId(), value != null);
ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
EventConfigurationIdsWithDeletedState eventConfigurationIdsWithDeletedState = EventConfigurationIdsWithDeletedState.newBuilder().setEventConfigurations(eventConfigurationUpdates).build();
return KeyValue.pair(projectKey, eventConfigurationIdsWithDeletedState);
})
.groupByKey()
.reduce((aggValue, newValue) -> {
Map<String, Boolean> newEventConfigurations = newValue.getEventConfigurations();
Map<String, Boolean> aggEventConfigurations = aggValue.getEventConfigurations();
Map.Entry<String, Boolean> newEntry = newEventConfigurations.entrySet().iterator().next();
if (newEntry.getValue())
aggEventConfigurations.putAll(newEventConfigurations);
else
aggEventConfigurations.remove(newEntry.getKey());
if (aggEventConfigurations.size() == 0)
return null;
return aggValue;
});
(eventConfigurationStream 的类型为 KStream<EventConfigurationKey, EventConfiguration>
)
遵循此模式的另一个示例。请注意,这里也有一个过滤器,但情况并非总是如此:
KTable<ProjectKey, NotificationSettingsTransition> globalNotificationSettingsPerProjectTable = notificationSettingTable.toStream()
.filter((key, value) -> {
return key.getEventConfigurationId() == null;
})
.map((key, value) -> {
ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
Map<String, NotificationSetting> notificationSettingsMap = new HashMap<>();
notificationSettingsMap.put(getAsCompoundKeyString(key), value);
NotificationSettingsTransition notificationSettingTransition = NotificationSettingsTransition
.newBuilder()
.setNotificationSettingCompoundKeyLastUpdate(getAsCompoundKey(key))
.setNotificationSettingLastUpdate(value)
.setEventConfigurationIds(new ArrayList<>())
.setNotificationSettingsMap(notificationSettingsMap)
.build();
return KeyValue.pair(projectKey, notificationSettingTransition);
})
.groupByKey()
.reduce((aggValue, newValue) -> {
Map<String, NotificationSetting> notificationSettingMap = aggValue.getNotificationSettingsMap();
String compoundKeyAsString = getAsString(newValue.getNotificationSettingCompoundKeyLastUpdate());
if (newValue.getNotificationSettingLastUpdate() != null)
notificationSettingMap.put(compoundKeyAsString, newValue.getNotificationSettingLastUpdate());
else
notificationSettingMap.remove(compoundKeyAsString);
aggValue.setNotificationSettingCompoundKeyLastUpdate(newValue.getNotificationSettingCompoundKeyLastUpdate());
aggValue.setNotificationSettingLastUpdate(newValue.getNotificationSettingLastUpdate());
aggValue.setNotificationSettingsMap(notificationSettingMap);
return aggValue;
});
(notificationSettingsTable 的类型为 KTable<NotificationSettingKey, NotificationSetting> notificationSettingTable
,但也立即转换为 KStream。)
我如何将其提取到一个函数中,在该函数中我为地图代码和 reduce 代码传递了一个函数,但不必重复 .map().groupByKey().reduce()
的模式?虽然 return 类型不同并且取决于 map 函数中的代码,但应保持类型化。理想情况下在 Java 8,但更高版本是可能的。当地图代码中 KeyValuePair
的内部类型不会改变时,我想我很清楚如何做到这一点,但现在不确定如何做到这一点。
您可以参数化您的函数以接受两个泛型函数,其中类型将在函数被调用时被推断(或者如果不可能则明确设置)。
对于 map
的输入,您需要 BiFunction<K, V, T>
,对于 reduce
,您需要 BiFunction<U, U, U>
,其中:
K
是map
函数中key
的类型。V
是map
函数中value
的类型。T
是map
函数的 return 类型。U
是聚合器的类型,值和reduce
函数的 return 类型。
查看KStream
and KGroupedStream
,您可以获得更详细的类型信息以进一步约束函数。
这将使您的自定义函数变成这样:
<K, V, T, U> U mapGroupReduce(final KStream<K, V> stream, final BiFunction<K, V, T> mapper, final BiFunction<U, U, U> reducer) {
return stream.map(mapper).groupByKey().reduce(reducer);
}
然后你可以这样称呼它:
mapGroupReduce(yourStream,
(key, value) -> new KeyValue(k, v)),
(acc, value) -> acc);
在您的情况下,您需要使用:
而不是使用BiFunction
s
KeyValueMapper<K, V, KeyValue<T, U>>
映射器Reducer<U>
为减速器。
然而,这真的比每次都写 stream.map(M).groupByKey().reduce(R)
好得多吗?更详细的版本更明确,并且考虑到映射器和缩减器的相对大小,你并没有真正节省那么多。