SparkStreaming 更新工作人员可读和可写的字典(java 地图)
SparkStreaming updating a dictionary (java map) readable and writable by workers
在我的代码中,我试图想出一种方法,让 Map 用作某些操作的参考。做个例子(伪代码):
JavaDstream miao = create new Dstream;
Map<String,String[]> dictionary = new HashMap<String,String[]>(); //would I need an Hashtable in this case?
miao.foreachRDD( rdd ->{
rdd.foreach(line ->{ //line is a String[]
if (dictionary.containsKey(line[0]){
if(dictionary.get(line[0])[INDEX].equals(line[INDEX])){
append line on csv file;
dictionary.put(line[0],line);
}else{
append line in another file;
}
}else{
dictionary.put(line[0],line);
}
})})
这种情况在我的应用程序中很常见:检查是否已处理,在一种情况下做事,在另一种情况下做事,所以我需要找到一种方法来做到这一点。
我今天阅读了很多关于广播变量的内容并检查了
- ways to update them,但在我的情况下看起来有点不可行。
- 我读到 Accumulators 不能被 workers 读取,所以 even personalized ones 对我没用。
- 最后,我尝试使用variables outside the Dstreams,但出现错误,我必须将变量更改为final(这行得通吗,我想知道:地图是最终的,但内容可能会改变) .
如果我将 Map 委托给另一个 class,一个可序列化的,并在那里将其设置为静态,我会得到一些可流式集合吗?据我了解,我认为不会:它将更改为“本地”,但其他工作人员不会收到任何更新。
编辑:正如承诺的那样,虽然我迟到了:
我要做的是:
- 获取一个字符串,其中包含一个具有公差范围的数字 dangerParameter
- 详细说明该字符串以创建设备密钥
- deviceKey在广播字典中匹配,初始字符串丰富了字典信息
- 此时,分析(用Map(deviceKey, [counter ; elapsed time]),剔除单个异常):如果dangerParameter超出边界->发送邮件
- 但是如果那个参数确实在外面,它发送的下一行可能会带来另一个触发值。
- 因此,我将其存储(现在在地图中,今天我将尝试@maasg 的解决方案)并将 deviceKey 作为键,一个时间戳。
- 所以将在地图中查找具有触发 deviceKey 的下一行,如果时间戳早于一个小时。
代码:
private static final Map<String, String> alertsAlreadySent = new Hashtable<String, String>(); //MAP done with id and timestamp
public static void sendMail(String whoTo, String[] whoIsDying){ //email address, string enriched with person data
if( (!alertsAlreadySent.containsKey(whoIsDying[CSVExampleDevice.DEVICE_ID])) //if it's not already in the map
|| //OR short circuited
((Long.parseLong(alertsAlreadySent.get(whoIsDying[CSVExampleDevice.DEVICE_ID])) - Long.parseLong(whoIsDying[CSVExampleDevice.TIMESTAMP]))>3600000)
){ // he was already dying, but an hour has already passed, so it may be a new alert
indeedSendMail(whoTo, whoIsDying); //a function to send the mail
alertsAlreadySent.put(whoIsDying[CSVExampleDevice.DEVICE_ID], whoIsDying[CSVExampleDevice.TIMESTAMP]);
//sent the email, we update the timestamp in the map
}
}
我身边还有其他这样的案例。
Stateful Dstream 可以轻松替代这些方法吗?
我认为这里的目的是在处理数据流时保留一些状态。同时,我们想对流中的数据进行如下分类:
- 密钥未知 => 已知
- 密钥已知,但其值与我们的状态不匹配 => 添加记录到文件 "A",记住新值
- 键已知,值等于我们知道的值=>添加记录到文件"B"
问:我们可以使用 Java 工人可读写的地图吗?
没有
在被视为分布式系统的 Spark Streaming 中,我们不能跨执行器使用可变集合并期望传播对该结构的更改。这些对象存在于创建它们的 JVM 中,所有更改都保留在 JVM 本地。有一些方法可以做到这一点(例如 CRDT),但这将需要执行者之间的额外消息传递基础设施。另一种选择可能是集中式存储,例如分布式缓存或数据库。
问:我们可以换个方式吗?
是
Spark Streaming 支持状态转换,允许我们对此类过程进行建模。我们确实需要改变方法才能使这项工作奏效。因此,我们将标记条目、构建我们的状态并将结果分组以优化 I/O 操作,而不是像原始问题旨在做的那样验证条件并采取行动。
(我打算使用 Scala。Java 方法与 API 非常相似,但增加了冗长且缺少模式匹配功能)
val dstream = ??? // my dstream here
// We define a state update function that implements our business logic in dealing with changing values
def stateUpdateFunction(
key: String,
value: Option[Array[String]],
state: State[String]): Option[(String,String, Array[String])] = {
val stateValue = state.getOption() // Get current value for the given key
val label = (stateValue, value) match {
case (None, Some(newValue)) => // new key!
state.update(newValue(0)) // Update session data
"NEW_KEY" // this is the resulting label for this state
case (Some(oldValue), Some(newValue)) if (oldValue == newValue(0)) => // we know the key. The value is the same. In this case we don't update the state
"SAME_VALUE"
case (Some(oldValue), Some(newValue)) if (oldValue != newValue(0)) => // the value is not the same, so we store the new value
state.update(newValue(0))
"NEW_VALUE"
case (None, None) => "NOP" // do nothing
case (Some(oldValue), None) => "NOP" // do nothing
}
value.map(v => (label, key, v)) // the computed a label for this key and the given value
}
val stateSpec = StateSpec.function(stateUpdateFunction _)
// transform the original stream into a key/value dsteam that preserves the original data in the value
val keyedDstream = dstream.map(elem => (elem(0), elem))
// Add labels to the data using a stateful transformation
val mappedDstream = dstream.mapWithState(stateSpec)
// remove the "None" in the stream
val labeledDataDStream = mappedDstream.filter(entry => entry != None) // or flatMap(identity)
// Now, labeledDataDStream contains our data labeled, we can proceed to filter it out in the different required subsets
val changedValuesDStream = labeledData.filter{case (label, key, value) => label == "NEW_VALUE"}
val sameValuesDStream = labeledData.filter{case (label, key, value) => label == "SAME_VALUE"}
val newKeyDStream = labeledData.filter{case (label, key, value) => label == "NEW_KEY"}
// we can save those datasets to disk (or store in a db, ...)
changedValuesDStream.saveAsTextFiles("...")
sameValueDStream.saveAsTextFiles("..."
在我的代码中,我试图想出一种方法,让 Map 用作某些操作的参考。做个例子(伪代码):
JavaDstream miao = create new Dstream;
Map<String,String[]> dictionary = new HashMap<String,String[]>(); //would I need an Hashtable in this case?
miao.foreachRDD( rdd ->{
rdd.foreach(line ->{ //line is a String[]
if (dictionary.containsKey(line[0]){
if(dictionary.get(line[0])[INDEX].equals(line[INDEX])){
append line on csv file;
dictionary.put(line[0],line);
}else{
append line in another file;
}
}else{
dictionary.put(line[0],line);
}
})})
这种情况在我的应用程序中很常见:检查是否已处理,在一种情况下做事,在另一种情况下做事,所以我需要找到一种方法来做到这一点。
我今天阅读了很多关于广播变量的内容并检查了
- ways to update them,但在我的情况下看起来有点不可行。
- 我读到 Accumulators 不能被 workers 读取,所以 even personalized ones 对我没用。
- 最后,我尝试使用variables outside the Dstreams,但出现错误,我必须将变量更改为final(这行得通吗,我想知道:地图是最终的,但内容可能会改变) .
如果我将 Map 委托给另一个 class,一个可序列化的,并在那里将其设置为静态,我会得到一些可流式集合吗?据我了解,我认为不会:它将更改为“本地”,但其他工作人员不会收到任何更新。
编辑:正如承诺的那样,虽然我迟到了: 我要做的是:
- 获取一个字符串,其中包含一个具有公差范围的数字 dangerParameter
- 详细说明该字符串以创建设备密钥
- deviceKey在广播字典中匹配,初始字符串丰富了字典信息
- 此时,分析(用Map(deviceKey, [counter ; elapsed time]),剔除单个异常):如果dangerParameter超出边界->发送邮件
- 但是如果那个参数确实在外面,它发送的下一行可能会带来另一个触发值。
- 因此,我将其存储(现在在地图中,今天我将尝试@maasg 的解决方案)并将 deviceKey 作为键,一个时间戳。
- 所以将在地图中查找具有触发 deviceKey 的下一行,如果时间戳早于一个小时。 代码:
private static final Map<String, String> alertsAlreadySent = new Hashtable<String, String>(); //MAP done with id and timestamp
public static void sendMail(String whoTo, String[] whoIsDying){ //email address, string enriched with person data
if( (!alertsAlreadySent.containsKey(whoIsDying[CSVExampleDevice.DEVICE_ID])) //if it's not already in the map
|| //OR short circuited
((Long.parseLong(alertsAlreadySent.get(whoIsDying[CSVExampleDevice.DEVICE_ID])) - Long.parseLong(whoIsDying[CSVExampleDevice.TIMESTAMP]))>3600000)
){ // he was already dying, but an hour has already passed, so it may be a new alert
indeedSendMail(whoTo, whoIsDying); //a function to send the mail
alertsAlreadySent.put(whoIsDying[CSVExampleDevice.DEVICE_ID], whoIsDying[CSVExampleDevice.TIMESTAMP]);
//sent the email, we update the timestamp in the map
}
}
我身边还有其他这样的案例。
Stateful Dstream 可以轻松替代这些方法吗?
我认为这里的目的是在处理数据流时保留一些状态。同时,我们想对流中的数据进行如下分类:
- 密钥未知 => 已知
- 密钥已知,但其值与我们的状态不匹配 => 添加记录到文件 "A",记住新值
- 键已知,值等于我们知道的值=>添加记录到文件"B"
问:我们可以使用 Java 工人可读写的地图吗?
没有
在被视为分布式系统的 Spark Streaming 中,我们不能跨执行器使用可变集合并期望传播对该结构的更改。这些对象存在于创建它们的 JVM 中,所有更改都保留在 JVM 本地。有一些方法可以做到这一点(例如 CRDT),但这将需要执行者之间的额外消息传递基础设施。另一种选择可能是集中式存储,例如分布式缓存或数据库。
问:我们可以换个方式吗?
是
Spark Streaming 支持状态转换,允许我们对此类过程进行建模。我们确实需要改变方法才能使这项工作奏效。因此,我们将标记条目、构建我们的状态并将结果分组以优化 I/O 操作,而不是像原始问题旨在做的那样验证条件并采取行动。 (我打算使用 Scala。Java 方法与 API 非常相似,但增加了冗长且缺少模式匹配功能)
val dstream = ??? // my dstream here
// We define a state update function that implements our business logic in dealing with changing values
def stateUpdateFunction(
key: String,
value: Option[Array[String]],
state: State[String]): Option[(String,String, Array[String])] = {
val stateValue = state.getOption() // Get current value for the given key
val label = (stateValue, value) match {
case (None, Some(newValue)) => // new key!
state.update(newValue(0)) // Update session data
"NEW_KEY" // this is the resulting label for this state
case (Some(oldValue), Some(newValue)) if (oldValue == newValue(0)) => // we know the key. The value is the same. In this case we don't update the state
"SAME_VALUE"
case (Some(oldValue), Some(newValue)) if (oldValue != newValue(0)) => // the value is not the same, so we store the new value
state.update(newValue(0))
"NEW_VALUE"
case (None, None) => "NOP" // do nothing
case (Some(oldValue), None) => "NOP" // do nothing
}
value.map(v => (label, key, v)) // the computed a label for this key and the given value
}
val stateSpec = StateSpec.function(stateUpdateFunction _)
// transform the original stream into a key/value dsteam that preserves the original data in the value
val keyedDstream = dstream.map(elem => (elem(0), elem))
// Add labels to the data using a stateful transformation
val mappedDstream = dstream.mapWithState(stateSpec)
// remove the "None" in the stream
val labeledDataDStream = mappedDstream.filter(entry => entry != None) // or flatMap(identity)
// Now, labeledDataDStream contains our data labeled, we can proceed to filter it out in the different required subsets
val changedValuesDStream = labeledData.filter{case (label, key, value) => label == "NEW_VALUE"}
val sameValuesDStream = labeledData.filter{case (label, key, value) => label == "SAME_VALUE"}
val newKeyDStream = labeledData.filter{case (label, key, value) => label == "NEW_KEY"}
// we can save those datasets to disk (or store in a db, ...)
changedValuesDStream.saveAsTextFiles("...")
sameValueDStream.saveAsTextFiles("..."