Spark Streaming 将数据集<Row> 转换为 java 中的数据集<CustomObject>

Spark Streaming Convert Dataset<Row> to Dataset<CustomObject> in java

我最近开始使用 apache spark 并遇到一个需求,我需要读取 kafka 流并在 cassandra 中提供数据。这样做时,我遇到了一个问题,因为流是基于 SQL 的,而 cassandra 连接器是在 rdd 上的(我在这里可能是错的,请纠正我)我正在努力让它工作。我不知何故使它现在可以工作,但不确定这是否是真正的实施方式。

下面是代码

架构

StructType getSchema(){
StructField[] structFields = new StructField[]{
                new StructField("id", DataTypes.LongType, true, Metadata.empty()),
                new StructField("name", DataTypes.StringType, true, Metadata.empty()),
                new StructField("cat", DataTypes.StringType, true, Metadata.empty()),
                new StructField("tag", DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty())
              
        };
        return new StructType(structFields);
}

流reader

  Dataset<Row> results = kafkaDataset.select(
                col("key").cast("string"),
                from_json(col("value").cast("string"), getSchema()).as("value"),
                col("topic"),
                col("partition"),
                col("offset"),
                col("timestamp"),
                col("timestampType"));

        results.select("value.*")
                .writeStream()
                .foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
                    @Override
                    public void call(Dataset<Row> dataset, Long batchId) throws Exception {
                        ObjectMapper mapper = new ObjectMapper();
                        List<DealFeedSchema> list = new ArrayList<>();
                        List<Row> rowList = dataset.collectAsList();
                        if (!rowList.isEmpty()) {
                            rowList.forEach(row -> {
                                if (row == null) logger.info("Null DataSet");
                                else {
                                    try {
                                        list.add(mapper.readValue(row.json(), DealFeedSchema.class));
                                    } catch (JsonProcessingException e) {
                                        logger.error("error parsing Data", e);
                                    }
                                }
                            });
                            JavaRDD<DealFeedSchema> rdd = new JavaSparkContext(session.sparkContext()).parallelize(list);
                            javaFunctions(rdd).writerBuilder(Constants.CASSANDRA_KEY_SPACE,
                                    Constants.CASSANDRA_DEAL_TABLE_SPACE, mapToRow(DealFeedSchema.class)).saveToCassandra();
                        }
                    }

                }).
                start().awaitTermination();

虽然这很好用,但我需要知道是否有更好的方法,如果有的话请告诉我如何实现它。

提前致谢。 对于那些正在寻找一种方法的人,您可以参考此代码作为替代方法。:)

只需从 Spark Structured Streaming 写入数据而无需转换为 RDD - 您只需切换到使用 added this capability, together with much more stuff.

的 Spark Cassandra Connector 2.5.0

当您使用它时,您的代码将看起来像 following(我没有 Java 示例,但它应该与此类似):

val query = streamingCountsDF.writeStream
  .outputMode(OutputMode.Update)
  .format("org.apache.spark.sql.cassandra")
  .option("checkpointLocation", "some_checkpoint_location")
  .option("keyspace", "test")
  .option("table", "sttest_tweets")
  .start()

将数据集< Row > 转换为数据集< DealFeedSchema > java

1. Java Bean for DealFeedSchema


import java.util.List;

public class DealFeedSchema {
    private long id;
    private String name;
    private String cat;
    private List<String> tag;


    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getCat() {
        return cat;
    }

    public void setCat(String cat) {
        this.cat = cat;
    }

    public List<String> getTag() {
        return tag;
    }

    public void setTag(List<String> tag) {
        this.tag = tag;
    }
}

2. Load the test data

 Dataset<Row> dataFrame = spark.createDataFrame(Arrays.asList(
                RowFactory.create(1L, "foo", "cat1", Arrays.asList("tag1", "tag2"))
        ), getSchema());
        dataFrame.show(false);
        dataFrame.printSchema();
        /**
         * +---+----+----+------------+
         * |id |name|cat |tag         |
         * +---+----+----+------------+
         * |1  |foo |cat1|[tag1, tag2]|
         * +---+----+----+------------+
         *
         * root
         *  |-- id: long (nullable = true)
         *  |-- name: string (nullable = true)
         *  |-- cat: string (nullable = true)
         *  |-- tag: array (nullable = true)
         *  |    |-- element: string (containsNull = true)
         */

3. Convert Dataset<Row> to Dataset<DealFeedSchema>

        Dataset<DealFeedSchema> dealFeedSchemaDataset = dataFrame.as(Encoders.bean(DealFeedSchema.class));
        dealFeedSchemaDataset.show(false);
        dealFeedSchemaDataset.printSchema();
        /**
         * +---+----+----+------------+
         * |id |name|cat |tag         |
         * +---+----+----+------------+
         * |1  |foo |cat1|[tag1, tag2]|
         * +---+----+----+------------+
         *
         * root
         *  |-- id: long (nullable = true)
         *  |-- name: string (nullable = true)
         *  |-- cat: string (nullable = true)
         *  |-- tag: array (nullable = true)
         *  |    |-- element: string (containsNull = true)
         */