FiloDB + Spark Streaming 数据丢失

FiloDB + Spark Streaming Data Loss

我将 FiloDB 0.4 与 Cassandra 2.2.5 列和元存储一起使用,并尝试使用 Spark Streaming 1.6.1 + Jobserver 0.6.2 将数据插入其中。我使用以下代码插入数据:

messages.foreachRDD(parseAndSaveToFiloDb)

private static Function<JavaPairRDD<String, String>, Void> parseAndSaveToFiloDb = initialRdd -> {
        final List<RowWithSchema> parsedMessages = parseMessages(initialRdd.collect());
        final JavaRDD<Row> rdd = javaSparkContext.parallelize(createRows(parsedMessages));
        final DataFrame dataFrame = sqlContext.createDataFrame(rdd, generateSchema(rawMessages);

        dataFrame.write().format("filodb.spark")
                .option("database", keyspace)
                .option("dataset", dataset)
                .option("row_keys", rowKeys)
                .option("partition_keys", partitionKeys)
                .option("segment_key", segmentKey)
                .mode(saveMode).save();
        return null;
    };

段键为“:string /0”,行键设置为每行唯一的列,分区键设置为对所有行都是常量的列。换句话说,我所有的测试数据集都进入单个分区上的单个段。当我使用单个单节点 Spark 时,一切正常,我插入了所有数据,但是当我同时 运行 两个独立的单节点 Spark(不是集群)时,我迷路了即使我以几秒为间隔一条一条地发送消息,也大约有 30-60% 的数据。 我检查了 dataFrame.write() 是否针对每条消息执行,因此问题发生在这一行之后。 当我将段键设置为每行唯一的列时,所有数据都达到 Cassandra/FiloDB。

请为我提出具有 2 个独立火花的场景的解决方案。

@psyduck,这很可能是因为每个分区的数据一次只能在一个节点上摄取——对于 0.4 版本。因此,要坚持使用当前版本,您需要将数据划分为多个分区,然后确保每个工作人员只获得一个分区。实现上述目标的最简单方法是按分区键对数据进行排序。

不过,我强烈建议您升级到最新版本 - master (Spark 2.x / Scala 2.11) 或 spark1.6 分支 (spark 1.6 / Scala 2.10)。最新版本有很多 0.4 没有的变化可以解决你的问题:

  • 使用 Akka 集群自动将您的数据路由到正确的摄取节点。在这种情况下,使用相同的模型,您的数据将全部转到正确的节点并确保没有数据丢失
  • TimeUUID-based chunkID,因此即使多个工作人员(在脑裂的情况下)以某种方式写入同一分区,也可以避免数据丢失
  • 新的"segment less"数据模型,因此您无需定义任何段键,读写效率更高

随时联系我们的邮件列表,https://groups.google.com/forum/#!forum/filodb-discuss