我可以使用 Apache Spark 将数据存储在 RAM 中吗?
Can I store data in RAM with Apache Spark?
我想知道是否可以使用 Apache Spark 存储一堆字符串,例如,在 RAM 中。事实上,我想根据 Apache Spark 正在处理的新输入数据查询和更新这些字符串。此外,如果可能的话,一个节点是否可以通知所有其他节点存储了哪些字符串?
如果您需要有关我的项目的信息,请随时询问。
J
是的,您需要状态流功能mapWithState
。此函数允许您跨流式批处理更新缓存在内存中的状态。
请注意,如果您尚未启用检查点,则需要启用。
Scala 示例用法:
def stateUpdateFunction(userId: UserId,
newData: UserAction,
stateData: State[UserSession]): UserModel = {
val currentSession = stateData.get() // Get current session data
val updatedSession = ... // Compute updated session using newData
stateData.update(updatedSession) // Update session data
val userModel = ... // Compute model using updatedSession
return userModel // Send model downstream
}
// Stream of user actions, keyed by the user ID
val userActions = ... // stream of key-value tuples of (UserId, UserAction)
// Stream of data to commit
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
Java 用法示例:
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
我想知道是否可以使用 Apache Spark 存储一堆字符串,例如,在 RAM 中。事实上,我想根据 Apache Spark 正在处理的新输入数据查询和更新这些字符串。此外,如果可能的话,一个节点是否可以通知所有其他节点存储了哪些字符串? 如果您需要有关我的项目的信息,请随时询问。
J
是的,您需要状态流功能mapWithState
。此函数允许您跨流式批处理更新缓存在内存中的状态。
请注意,如果您尚未启用检查点,则需要启用。
Scala 示例用法:
def stateUpdateFunction(userId: UserId,
newData: UserAction,
stateData: State[UserSession]): UserModel = {
val currentSession = stateData.get() // Get current session data
val updatedSession = ... // Compute updated session using newData
stateData.update(updatedSession) // Update session data
val userModel = ... // Compute model using updatedSession
return userModel // Send model downstream
}
// Stream of user actions, keyed by the user ID
val userActions = ... // stream of key-value tuples of (UserId, UserAction)
// Stream of data to commit
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
Java 用法示例:
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));