将 JavaRDD<Status> 转换为 JavaRDD<String> 的问题

Problem with transformation JavaRDD<Status> to JavaRDD<String>

我正在尝试将推文从 Twitter 保存到 MongoDb 数据库。

我有 RDD<Status>,我正在尝试借助帮助将其转换为 JSON 格式 ObjectMapper.But 此转换存在一些问题(

public class Main {


    //set system credentials for access to twitter
    private static void setTwitterOAuth() {
        System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);
        System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);
        System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);
        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);
    }


    public static void main(String [] args) {

        setTwitterOAuth();

        SparkConf conf = new SparkConf().setMaster("local[2]")
                                        .setAppName("SparkTwitter");
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));
        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);

        //Stream that contains just tweets in english
        JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
        enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());


        enTweetsDStream.print();
        jssc.start();
        jssc.awaitTermination();
    }

    static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {
     try {
            ObjectMapper objectMapper = new ObjectMapper();
            SQLContext sqlContext = new SQLContext(sparkContext);
            JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));

            DataFrame dataFrame = sqlContext.read().json(tweet);

            Map<String, String> writeOverrides = new HashMap<>();
            writeOverrides.put("uri", "mongodb://127.0.0.1/forensicdb.LiveRawTweets");
            WriteConfig writeConfig = WriteConfig.create(sparkContext).withJavaOptions(writeOverrides);
            MongoSpark.write(dataFrame).option("collection", "LiveRawTweets").mode("append").save();

        } catch (Exception e) {
            System.out.println("Error saving to database");
        }
    }

JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));

此处需要 problem.Incompatible 类型 JavaRDD<String> 但映射被推断为 javaRDD<R>

不幸的是,

Java 类型推断并不总是非常聪明,所以我在这些情况下所做的是提取我的 lambda 的所有位作为变量,直到我找到一个 Java 不能给出准确的类型。然后我给表达式我认为它应该有的类型,看看为什么 Java 抱怨它。有时它只是编译器的限制,您必须显式 "cast" 表达式作为所需的类型,有时您会发现代码有问题。在你的情况下,代码对我来说很好,所以一定有其他东西。

但是我有一个评论:在这里你支付一次 JSON 序列化的成本(从 Status 到 JSON 字符串)然后反序列化(从 JSON字符串到 Row)。另外,您没有为 Dataset 提供任何架构,因此它必须两次传递数据(或根据您的配置对其进行采样)以推断架构。如果数据很大,所有这些都可能非常昂贵。如果性能是一个问题并且 Status 相对简单,我建议您直接编写从 StatusRow 的转换。

另一个 "by the way":您正在隐式序列化您的 ObjectMapper,很可能您不想这样做。似乎 class 确实支持 Java 序列化,但支持 special logic。由于 Spark 的默认配置是使用 Kryo(其性能比 Java 序列化好得多),我怀疑它在使用默认 FieldSerializer 时是否会做正确的事情。您有三个选择:

  • 使对象映射器静态化以避免对其进行序列化
  • 将您的 Kryo 注册器配置为 serialize/deserialize 类型 ObjectMapper 的对象,并进行 Java 序列化。那会奏效,但不值得付出努力。
  • 到处使用Java 序列化而不是 Kryo。馊主意!它很慢并且使用了很多space(内存和磁盘取决于序列化对象的写入位置)。