将 JavaRDD<Status> 转换为 JavaRDD<String> 的问题
Problem with transformation JavaRDD<Status> to JavaRDD<String>
我正在尝试将推文从 Twitter 保存到 MongoDb 数据库。
我有 RDD<Status>
,我正在尝试借助帮助将其转换为 JSON 格式 ObjectMapper.But 此转换存在一些问题(
public class Main {
//set system credentials for access to twitter
private static void setTwitterOAuth() {
System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);
}
public static void main(String [] args) {
setTwitterOAuth();
SparkConf conf = new SparkConf().setMaster("local[2]")
.setAppName("SparkTwitter");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Stream that contains just tweets in english
JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());
enTweetsDStream.print();
jssc.start();
jssc.awaitTermination();
}
static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {
try {
ObjectMapper objectMapper = new ObjectMapper();
SQLContext sqlContext = new SQLContext(sparkContext);
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
DataFrame dataFrame = sqlContext.read().json(tweet);
Map<String, String> writeOverrides = new HashMap<>();
writeOverrides.put("uri", "mongodb://127.0.0.1/forensicdb.LiveRawTweets");
WriteConfig writeConfig = WriteConfig.create(sparkContext).withJavaOptions(writeOverrides);
MongoSpark.write(dataFrame).option("collection", "LiveRawTweets").mode("append").save();
} catch (Exception e) {
System.out.println("Error saving to database");
}
}
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
此处需要 problem.Incompatible 类型 JavaRDD<String>
但映射被推断为 javaRDD<R>
不幸的是,Java 类型推断并不总是非常聪明,所以我在这些情况下所做的是提取我的 lambda 的所有位作为变量,直到我找到一个 Java 不能给出准确的类型。然后我给表达式我认为它应该有的类型,看看为什么 Java 抱怨它。有时它只是编译器的限制,您必须显式 "cast" 表达式作为所需的类型,有时您会发现代码有问题。在你的情况下,代码对我来说很好,所以一定有其他东西。
但是我有一个评论:在这里你支付一次 JSON 序列化的成本(从 Status
到 JSON 字符串)然后反序列化(从 JSON字符串到 Row
)。另外,您没有为 Dataset
提供任何架构,因此它必须两次传递数据(或根据您的配置对其进行采样)以推断架构。如果数据很大,所有这些都可能非常昂贵。如果性能是一个问题并且 Status
相对简单,我建议您直接编写从 Status
到 Row
的转换。
另一个 "by the way":您正在隐式序列化您的 ObjectMapper
,很可能您不想这样做。似乎 class 确实支持 Java 序列化,但支持 special logic。由于 Spark 的默认配置是使用 Kryo(其性能比 Java 序列化好得多),我怀疑它在使用默认 FieldSerializer
时是否会做正确的事情。您有三个选择:
- 使对象映射器静态化以避免对其进行序列化
- 将您的 Kryo 注册器配置为 serialize/deserialize 类型
ObjectMapper
的对象,并进行 Java 序列化。那会奏效,但不值得付出努力。
- 到处使用Java 序列化而不是 Kryo。馊主意!它很慢并且使用了很多space(内存和磁盘取决于序列化对象的写入位置)。
我正在尝试将推文从 Twitter 保存到 MongoDb 数据库。
我有 RDD<Status>
,我正在尝试借助帮助将其转换为 JSON 格式 ObjectMapper.But 此转换存在一些问题(
public class Main {
//set system credentials for access to twitter
private static void setTwitterOAuth() {
System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);
}
public static void main(String [] args) {
setTwitterOAuth();
SparkConf conf = new SparkConf().setMaster("local[2]")
.setAppName("SparkTwitter");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Stream that contains just tweets in english
JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());
enTweetsDStream.print();
jssc.start();
jssc.awaitTermination();
}
static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {
try {
ObjectMapper objectMapper = new ObjectMapper();
SQLContext sqlContext = new SQLContext(sparkContext);
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
DataFrame dataFrame = sqlContext.read().json(tweet);
Map<String, String> writeOverrides = new HashMap<>();
writeOverrides.put("uri", "mongodb://127.0.0.1/forensicdb.LiveRawTweets");
WriteConfig writeConfig = WriteConfig.create(sparkContext).withJavaOptions(writeOverrides);
MongoSpark.write(dataFrame).option("collection", "LiveRawTweets").mode("append").save();
} catch (Exception e) {
System.out.println("Error saving to database");
}
}
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
此处需要 problem.Incompatible 类型 JavaRDD<String>
但映射被推断为 javaRDD<R>
Java 类型推断并不总是非常聪明,所以我在这些情况下所做的是提取我的 lambda 的所有位作为变量,直到我找到一个 Java 不能给出准确的类型。然后我给表达式我认为它应该有的类型,看看为什么 Java 抱怨它。有时它只是编译器的限制,您必须显式 "cast" 表达式作为所需的类型,有时您会发现代码有问题。在你的情况下,代码对我来说很好,所以一定有其他东西。
但是我有一个评论:在这里你支付一次 JSON 序列化的成本(从 Status
到 JSON 字符串)然后反序列化(从 JSON字符串到 Row
)。另外,您没有为 Dataset
提供任何架构,因此它必须两次传递数据(或根据您的配置对其进行采样)以推断架构。如果数据很大,所有这些都可能非常昂贵。如果性能是一个问题并且 Status
相对简单,我建议您直接编写从 Status
到 Row
的转换。
另一个 "by the way":您正在隐式序列化您的 ObjectMapper
,很可能您不想这样做。似乎 class 确实支持 Java 序列化,但支持 special logic。由于 Spark 的默认配置是使用 Kryo(其性能比 Java 序列化好得多),我怀疑它在使用默认 FieldSerializer
时是否会做正确的事情。您有三个选择:
- 使对象映射器静态化以避免对其进行序列化
- 将您的 Kryo 注册器配置为 serialize/deserialize 类型
ObjectMapper
的对象,并进行 Java 序列化。那会奏效,但不值得付出努力。 - 到处使用Java 序列化而不是 Kryo。馊主意!它很慢并且使用了很多space(内存和磁盘取决于序列化对象的写入位置)。