在 Pyspark 结构化流中仅捕获 CDC 的有效负载?

Catch only the payload of CDC in Pyspark structured streaming?

     import os

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'

import findspark

findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'

spark = SparkSession \
    .builder \
    .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
    .master("local[*]") \
    .getOrCreate()

# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic_name) \
    .load()

print("Printing Schema of df: ")
df.printSchema()


df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
df1.printSchema()

 schema = StructType() \
        .add("name", StringType()) \
        .add("type", StringType())

df2 = df1\
        .select(from_json(col("value"), schema)\
        .alias("records"), "timestamp")
    df3 = df2.select("records.*", "timestamp")

  print("Printing Schema of records_df3: ")
    df3.printSchema()

 records_write_stream = df3 \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start()
    records_write_stream.awaitTermination()

    print("Stream Data Processing Application Completed.")

-经过更多搜索,我发现了如何仅显示和捕获 CDC 消息的有效负载部分。

  • 您需要将此添加到您的 Worker.properties:
value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=false

您应该修改您的 Debezeium 连接器以具有 value.converter.schemas.enabled=false,然后您将只有 payload 字段可以使用。

否则,您可以为整个对象创建 class/schema 以及 from_json() 函数,或者将值保留为字符串并使用 get_json_object() Spark 函数来解析数据

也相关 - 您可能想要提取 NewRecordState