SparkStreaming 更新工作人员可读和可写的字典(java 地图)

SparkStreaming updating a dictionary (java map) readable and writable by workers

在我的代码中,我试图想出一种方法,让 Map 用作某些操作的参考。做个例子(伪代码):

JavaDstream miao = create new Dstream;
Map<String,String[]> dictionary = new HashMap<String,String[]>(); //would I need an Hashtable in this case? 
miao.foreachRDD( rdd ->{
    rdd.foreach(line ->{ //line is a String[]
        if (dictionary.containsKey(line[0]){
            if(dictionary.get(line[0])[INDEX].equals(line[INDEX])){
                append line on csv file;
                dictionary.put(line[0],line);
            }else{
                append line in another file;
            }
        }else{
           dictionary.put(line[0],line);
        }
    })})

这种情况在我的应用程序中很常见:检查是否已处理,在一种情况下做事,在另一种情况下做事,所以我需要找到一种方法来做到这一点。
我今天阅读了很多关于广播变量的内容并检查了

如果我将 Map 委托给另一个 class,一个可序列化的,并在那里将其设置为静态,我会得到一些可流式集合吗?据我了解,我认为不会:它将更改为“本地”,但其他工作人员不会收到任何更新。

编辑:正如承诺的那样,虽然我迟到了: 我要做的是:

private static final Map<String, String> alertsAlreadySent = new Hashtable<String, String>(); //MAP done with id and timestamp

    public static void sendMail(String whoTo, String[] whoIsDying){ //email address, string enriched with person data
        if( (!alertsAlreadySent.containsKey(whoIsDying[CSVExampleDevice.DEVICE_ID])) //if it's not already in the map 
                || //OR short circuited
             ((Long.parseLong(alertsAlreadySent.get(whoIsDying[CSVExampleDevice.DEVICE_ID])) - Long.parseLong(whoIsDying[CSVExampleDevice.TIMESTAMP]))>3600000)  
             ){ // he was already dying, but an hour has already passed, so it may be a new alert
            indeedSendMail(whoTo, whoIsDying); //a function to send the mail
            alertsAlreadySent.put(whoIsDying[CSVExampleDevice.DEVICE_ID], whoIsDying[CSVExampleDevice.TIMESTAMP]);
            //sent the email, we update the timestamp in the map
        }
    }

我身边还有其他这样的案例。
Stateful Dstream 可以轻松替代这些方法吗?

我认为这里的目的是在处理数据流时保留一些状态。同时,我们想对流中的数据进行如下分类:

  • 密钥未知 => 已知
  • 密钥已知,但其值与我们的状态不匹配 => 添加记录到文件 "A",记住新值
  • 键已知,值等于我们知道的值=>添加记录到文件"B"

问:我们可以使用 Java 工人可读写的地图吗?

没有

在被视为分布式系统的 Spark Streaming 中,我们不能跨执行器使用可变集合并期望传播对该结构的更改。这些对象存在于创建它们的 JVM 中,所有更改都保留在 JVM 本地。有一些方法可以做到这一点(例如 CRDT),但这将需要执行者之间的额外消息传递基础设施。另一种选择可能是集中式存储,例如分布式缓存或数据库。

问:我们可以换个方式吗?

Spark Streaming 支持状态转换,允许我们对此类过程进行建模。我们确实需要改变方法才能使这项工作奏效。因此,我们将标记条目、构建我们的状态并将结果分组以优化 I/O 操作,而不是像原始问题旨在做的那样验证条件并采取行动。 (我打算使用 Scala。Java 方法与 API 非常相似,但增加了冗长且缺少模式匹配功能)

val dstream = ??? // my dstream here

// We define a state update function that implements our business logic in dealing with changing values

    def stateUpdateFunction(
        key: String,
        value: Option[Array[String]], 
        state: State[String]): Option[(String,String, Array[String])] = {

    val stateValue = state.getOption()    // Get current value for the given key 
    val label = (stateValue, value) match {
      case (None, Some(newValue)) =>       // new key!
        state.update(newValue(0))          // Update session data
        "NEW_KEY"                          // this is the resulting label for this state
      case (Some(oldValue), Some(newValue))  if (oldValue == newValue(0))  =>  // we know the key. The value is the same. In this case we don't update the state
        "SAME_VALUE"
      case (Some(oldValue), Some(newValue)) if (oldValue != newValue(0))  =>  // the value is not the same, so we store the new value  
        state.update(newValue(0)) 
        "NEW_VALUE"
      case (None, None) =>           "NOP"                 // do nothing
      case (Some(oldValue), None) => "NOP"        // do nothing

      }
    value.map(v => (label, key, v)) // the computed a label for this key and the given value  
    }

val stateSpec = StateSpec.function(stateUpdateFunction _)

// transform the original stream into a key/value dsteam that preserves the original data in the value
val keyedDstream = dstream.map(elem => (elem(0), elem)) 

// Add labels to the data using a stateful transformation
val mappedDstream = dstream.mapWithState(stateSpec)

// remove the "None" in the stream
val labeledDataDStream = mappedDstream.filter(entry => entry != None) // or flatMap(identity)

// Now, labeledDataDStream contains our data labeled, we can proceed to filter it out in the different required subsets                           

val changedValuesDStream = labeledData.filter{case (label, key, value) => label == "NEW_VALUE"}
val sameValuesDStream = labeledData.filter{case (label, key, value) => label == "SAME_VALUE"}
val newKeyDStream = labeledData.filter{case (label, key, value) => label == "NEW_KEY"}

// we can save those datasets to disk (or store in a db, ...)

changedValuesDStream.saveAsTextFiles("...")
sameValueDStream.saveAsTextFiles("..."