Spark Streaming 将数据集<Row> 转换为 java 中的数据集<CustomObject>
Spark Streaming Convert Dataset<Row> to Dataset<CustomObject> in java
我最近开始使用 apache spark 并遇到一个需求,我需要读取 kafka 流并在 cassandra 中提供数据。这样做时,我遇到了一个问题,因为流是基于 SQL 的,而 cassandra 连接器是在 rdd 上的(我在这里可能是错的,请纠正我)我正在努力让它工作。我不知何故使它现在可以工作,但不确定这是否是真正的实施方式。
下面是代码
架构
StructType getSchema(){
StructField[] structFields = new StructField[]{
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("cat", DataTypes.StringType, true, Metadata.empty()),
new StructField("tag", DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty())
};
return new StructType(structFields);
}
流reader
Dataset<Row> results = kafkaDataset.select(
col("key").cast("string"),
from_json(col("value").cast("string"), getSchema()).as("value"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
col("timestampType"));
results.select("value.*")
.writeStream()
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> dataset, Long batchId) throws Exception {
ObjectMapper mapper = new ObjectMapper();
List<DealFeedSchema> list = new ArrayList<>();
List<Row> rowList = dataset.collectAsList();
if (!rowList.isEmpty()) {
rowList.forEach(row -> {
if (row == null) logger.info("Null DataSet");
else {
try {
list.add(mapper.readValue(row.json(), DealFeedSchema.class));
} catch (JsonProcessingException e) {
logger.error("error parsing Data", e);
}
}
});
JavaRDD<DealFeedSchema> rdd = new JavaSparkContext(session.sparkContext()).parallelize(list);
javaFunctions(rdd).writerBuilder(Constants.CASSANDRA_KEY_SPACE,
Constants.CASSANDRA_DEAL_TABLE_SPACE, mapToRow(DealFeedSchema.class)).saveToCassandra();
}
}
}).
start().awaitTermination();
虽然这很好用,但我需要知道是否有更好的方法,如果有的话请告诉我如何实现它。
提前致谢。
对于那些正在寻找一种方法的人,您可以参考此代码作为替代方法。:)
只需从 Spark Structured Streaming 写入数据而无需转换为 RDD - 您只需切换到使用 added this capability, together with much more stuff.
的 Spark Cassandra Connector 2.5.0
当您使用它时,您的代码将看起来像 following(我没有 Java 示例,但它应该与此类似):
val query = streamingCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation", "some_checkpoint_location")
.option("keyspace", "test")
.option("table", "sttest_tweets")
.start()
将数据集< Row > 转换为数据集< DealFeedSchema > java
1. Java Bean for DealFeedSchema
import java.util.List;
public class DealFeedSchema {
private long id;
private String name;
private String cat;
private List<String> tag;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCat() {
return cat;
}
public void setCat(String cat) {
this.cat = cat;
}
public List<String> getTag() {
return tag;
}
public void setTag(List<String> tag) {
this.tag = tag;
}
}
2. Load the test data
Dataset<Row> dataFrame = spark.createDataFrame(Arrays.asList(
RowFactory.create(1L, "foo", "cat1", Arrays.asList("tag1", "tag2"))
), getSchema());
dataFrame.show(false);
dataFrame.printSchema();
/**
* +---+----+----+------------+
* |id |name|cat |tag |
* +---+----+----+------------+
* |1 |foo |cat1|[tag1, tag2]|
* +---+----+----+------------+
*
* root
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
* |-- cat: string (nullable = true)
* |-- tag: array (nullable = true)
* | |-- element: string (containsNull = true)
*/
3. Convert Dataset<Row> to Dataset<DealFeedSchema>
Dataset<DealFeedSchema> dealFeedSchemaDataset = dataFrame.as(Encoders.bean(DealFeedSchema.class));
dealFeedSchemaDataset.show(false);
dealFeedSchemaDataset.printSchema();
/**
* +---+----+----+------------+
* |id |name|cat |tag |
* +---+----+----+------------+
* |1 |foo |cat1|[tag1, tag2]|
* +---+----+----+------------+
*
* root
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
* |-- cat: string (nullable = true)
* |-- tag: array (nullable = true)
* | |-- element: string (containsNull = true)
*/
我最近开始使用 apache spark 并遇到一个需求,我需要读取 kafka 流并在 cassandra 中提供数据。这样做时,我遇到了一个问题,因为流是基于 SQL 的,而 cassandra 连接器是在 rdd 上的(我在这里可能是错的,请纠正我)我正在努力让它工作。我不知何故使它现在可以工作,但不确定这是否是真正的实施方式。
下面是代码
架构
StructType getSchema(){
StructField[] structFields = new StructField[]{
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("cat", DataTypes.StringType, true, Metadata.empty()),
new StructField("tag", DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty())
};
return new StructType(structFields);
}
流reader
Dataset<Row> results = kafkaDataset.select(
col("key").cast("string"),
from_json(col("value").cast("string"), getSchema()).as("value"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
col("timestampType"));
results.select("value.*")
.writeStream()
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> dataset, Long batchId) throws Exception {
ObjectMapper mapper = new ObjectMapper();
List<DealFeedSchema> list = new ArrayList<>();
List<Row> rowList = dataset.collectAsList();
if (!rowList.isEmpty()) {
rowList.forEach(row -> {
if (row == null) logger.info("Null DataSet");
else {
try {
list.add(mapper.readValue(row.json(), DealFeedSchema.class));
} catch (JsonProcessingException e) {
logger.error("error parsing Data", e);
}
}
});
JavaRDD<DealFeedSchema> rdd = new JavaSparkContext(session.sparkContext()).parallelize(list);
javaFunctions(rdd).writerBuilder(Constants.CASSANDRA_KEY_SPACE,
Constants.CASSANDRA_DEAL_TABLE_SPACE, mapToRow(DealFeedSchema.class)).saveToCassandra();
}
}
}).
start().awaitTermination();
虽然这很好用,但我需要知道是否有更好的方法,如果有的话请告诉我如何实现它。
提前致谢。 对于那些正在寻找一种方法的人,您可以参考此代码作为替代方法。:)
只需从 Spark Structured Streaming 写入数据而无需转换为 RDD - 您只需切换到使用 added this capability, together with much more stuff.
的 Spark Cassandra Connector 2.5.0当您使用它时,您的代码将看起来像 following(我没有 Java 示例,但它应该与此类似):
val query = streamingCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation", "some_checkpoint_location")
.option("keyspace", "test")
.option("table", "sttest_tweets")
.start()
将数据集< Row > 转换为数据集< DealFeedSchema > java
1. Java Bean for DealFeedSchema
import java.util.List;
public class DealFeedSchema {
private long id;
private String name;
private String cat;
private List<String> tag;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCat() {
return cat;
}
public void setCat(String cat) {
this.cat = cat;
}
public List<String> getTag() {
return tag;
}
public void setTag(List<String> tag) {
this.tag = tag;
}
}
2. Load the test data
Dataset<Row> dataFrame = spark.createDataFrame(Arrays.asList(
RowFactory.create(1L, "foo", "cat1", Arrays.asList("tag1", "tag2"))
), getSchema());
dataFrame.show(false);
dataFrame.printSchema();
/**
* +---+----+----+------------+
* |id |name|cat |tag |
* +---+----+----+------------+
* |1 |foo |cat1|[tag1, tag2]|
* +---+----+----+------------+
*
* root
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
* |-- cat: string (nullable = true)
* |-- tag: array (nullable = true)
* | |-- element: string (containsNull = true)
*/
3. Convert Dataset<Row> to Dataset<DealFeedSchema>
Dataset<DealFeedSchema> dealFeedSchemaDataset = dataFrame.as(Encoders.bean(DealFeedSchema.class));
dealFeedSchemaDataset.show(false);
dealFeedSchemaDataset.printSchema();
/**
* +---+----+----+------------+
* |id |name|cat |tag |
* +---+----+----+------------+
* |1 |foo |cat1|[tag1, tag2]|
* +---+----+----+------------+
*
* root
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
* |-- cat: string (nullable = true)
* |-- tag: array (nullable = true)
* | |-- element: string (containsNull = true)
*/