Spark SQL 在 Spark Streaming (KafkaStream) 中失败
Spark SQL failed in Spark Streaming (KafkaStream)
我在 Spark 流作业中使用 Spark SQL 在 Hive table 中进行搜索。
Kafka 流媒体工作正常,没有问题。如果我 运行 hiveContext.runSqlHive(sqlQuery);
在 directKafkaStream.foreachRDD
之外,它可以正常工作,没有问题。但我需要在流式作业中进行 Hive-Table 查找。使用 JDBC (jdbc:hive2://
) 可以,但我想使用 Spark SQL。
我的源代码中重要的地方如下:
// set context
SparkConf sparkConf = new SparkConf().setAppName(appName).set("spark.driver.allowMultipleContexts", "true");
SparkContext sparkSqlContext = new SparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration));
HiveContext hiveContext = new HiveContext(sparkSqlContext);
// Initialize Direct Spark Kafka Stream. Starts from top
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);
// work on stream
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
rdd.foreachPartition(tuple2Iterator -> {
// get message
Tuple2<String, String> item = tuple2Iterator.next();
// lookup
String sqlQuery = "SELECT something FROM somewhere";
Seq<String> resultSequence = hiveContext.runSqlHive(sqlQuery);
List<String> result = scala.collection.JavaConversions.seqAsJavaList(resultSequence);
});
return null;
});
// Start the computation
streamingContext.start();
streamingContext.awaitTermination();
我没有得到有意义的错误,即使我用 try-catch 包围。
我希望有人能提供帮助 - 谢谢。
//编辑:
解决方案如下所示:
// work on stream
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
// driver
Map<String, String> lookupMap = getResult(hiveContext); //something with hiveContext.runSqlHive(sqlQuery);
rdd.foreachPartition(tuple2Iterator -> {
// worker
while (tuple2Iterator != null && tuple2Iterator.hasNext()) {
// get message
Tuple2<String, String> item = tuple2Iterator.next();
// lookup
String result = lookupMap.get(item._2());
}
});
return null;
});
只是因为您想使用 Spark SQL 它不会使它成为可能。 Spark 的第一条规则是没有嵌套操作、转换或分布式数据结构。
如果您可以将您的查询表达为例如连接,您可以将其推到更高的级别 foreachRDD
,这几乎耗尽了您在此处使用 Spark SQL 的选项:
directKafkaStream.foreachRDD(rdd ->
hiveContext.runSqlHive(sqlQuery)
rdd.foreachPartition(...)
)
否则直接 JDBC 连接可能是一个有效选项。
我在 Spark 流作业中使用 Spark SQL 在 Hive table 中进行搜索。
Kafka 流媒体工作正常,没有问题。如果我 运行 hiveContext.runSqlHive(sqlQuery);
在 directKafkaStream.foreachRDD
之外,它可以正常工作,没有问题。但我需要在流式作业中进行 Hive-Table 查找。使用 JDBC (jdbc:hive2://
) 可以,但我想使用 Spark SQL。
我的源代码中重要的地方如下:
// set context
SparkConf sparkConf = new SparkConf().setAppName(appName).set("spark.driver.allowMultipleContexts", "true");
SparkContext sparkSqlContext = new SparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration));
HiveContext hiveContext = new HiveContext(sparkSqlContext);
// Initialize Direct Spark Kafka Stream. Starts from top
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);
// work on stream
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
rdd.foreachPartition(tuple2Iterator -> {
// get message
Tuple2<String, String> item = tuple2Iterator.next();
// lookup
String sqlQuery = "SELECT something FROM somewhere";
Seq<String> resultSequence = hiveContext.runSqlHive(sqlQuery);
List<String> result = scala.collection.JavaConversions.seqAsJavaList(resultSequence);
});
return null;
});
// Start the computation
streamingContext.start();
streamingContext.awaitTermination();
我没有得到有意义的错误,即使我用 try-catch 包围。
我希望有人能提供帮助 - 谢谢。
//编辑: 解决方案如下所示:
// work on stream
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
// driver
Map<String, String> lookupMap = getResult(hiveContext); //something with hiveContext.runSqlHive(sqlQuery);
rdd.foreachPartition(tuple2Iterator -> {
// worker
while (tuple2Iterator != null && tuple2Iterator.hasNext()) {
// get message
Tuple2<String, String> item = tuple2Iterator.next();
// lookup
String result = lookupMap.get(item._2());
}
});
return null;
});
只是因为您想使用 Spark SQL 它不会使它成为可能。 Spark 的第一条规则是没有嵌套操作、转换或分布式数据结构。
如果您可以将您的查询表达为例如连接,您可以将其推到更高的级别 foreachRDD
,这几乎耗尽了您在此处使用 Spark SQL 的选项:
directKafkaStream.foreachRDD(rdd ->
hiveContext.runSqlHive(sqlQuery)
rdd.foreachPartition(...)
)
否则直接 JDBC 连接可能是一个有效选项。