在 Pyspark 结构化流中仅捕获 CDC 的有效负载?
Catch only the payload of CDC in Pyspark structured streaming?
- 我正在尝试建立从 SQL 服务器到 Pyspark 的管道,以捕获 SQL 服务器中的数据变化,我已准备就绪:
- 在 SQL 服务器中启用 CDC
- 从 SQL 服务器到 Kafka 并从 Pyspark 结构化流中的 Kafka 主题消费。
- 问题是:当我尝试通过控制台消费者检查数据更改是否正在通过 Kafka 时,它向我显示 JSON 格式的消息分为两条记录:Schema 和 Payload 以及里面的 Payload是 Before 和 After,分别为您提供更改前的数据和更改后的数据。
- 我只在有效载荷中得到处理 --> 在 JSON 消息的一部分之后
- 因为当我像这样流式传输时,在 Jupyter 命令行中,我需要的字段显示为 null,我理解这是因为 JSON 格式很复杂
- 这是我的 pyspark 代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getOrCreate()
# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.load()
print("Printing Schema of df: ")
df.printSchema()
df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
df1.printSchema()
schema = StructType() \
.add("name", StringType()) \
.add("type", StringType())
df2 = df1\
.select(from_json(col("value"), schema)\
.alias("records"), "timestamp")
df3 = df2.select("records.*", "timestamp")
print("Printing Schema of records_df3: ")
df3.printSchema()
records_write_stream = df3 \
.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.option("truncate", "false")\
.format("console") \
.start()
records_write_stream.awaitTermination()
print("Stream Data Processing Application Completed.")
- 这是一张显示 CDC 消息到达 Kafka 的图像:
- 如果有人知道如何只使用有效负载-->在参与 Pyspark 结构化流后,请帮助我。
-经过更多搜索,我发现了如何仅显示和捕获 CDC 消息的有效负载部分。
- 您需要将此添加到您的 Worker.properties:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
您应该修改您的 Debezeium 连接器以具有 value.converter.schemas.enabled=false
,然后您将只有 payload
字段可以使用。
否则,您可以为整个对象创建 class/schema 以及 from_json()
函数,或者将值保留为字符串并使用 get_json_object()
Spark 函数来解析数据
也相关 - 您可能想要提取 NewRecordState
- 我正在尝试建立从 SQL 服务器到 Pyspark 的管道,以捕获 SQL 服务器中的数据变化,我已准备就绪:
- 在 SQL 服务器中启用 CDC
- 从 SQL 服务器到 Kafka 并从 Pyspark 结构化流中的 Kafka 主题消费。
- 问题是:当我尝试通过控制台消费者检查数据更改是否正在通过 Kafka 时,它向我显示 JSON 格式的消息分为两条记录:Schema 和 Payload 以及里面的 Payload是 Before 和 After,分别为您提供更改前的数据和更改后的数据。
- 我只在有效载荷中得到处理 --> 在 JSON 消息的一部分之后
- 因为当我像这样流式传输时,在 Jupyter 命令行中,我需要的字段显示为 null,我理解这是因为 JSON 格式很复杂
- 这是我的 pyspark 代码:
- 我只在有效载荷中得到处理 --> 在 JSON 消息的一部分之后
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getOrCreate()
# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.load()
print("Printing Schema of df: ")
df.printSchema()
df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
df1.printSchema()
schema = StructType() \
.add("name", StringType()) \
.add("type", StringType())
df2 = df1\
.select(from_json(col("value"), schema)\
.alias("records"), "timestamp")
df3 = df2.select("records.*", "timestamp")
print("Printing Schema of records_df3: ")
df3.printSchema()
records_write_stream = df3 \
.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.option("truncate", "false")\
.format("console") \
.start()
records_write_stream.awaitTermination()
print("Stream Data Processing Application Completed.")
- 这是一张显示 CDC 消息到达 Kafka 的图像:
- 如果有人知道如何只使用有效负载-->在参与 Pyspark 结构化流后,请帮助我。
-经过更多搜索,我发现了如何仅显示和捕获 CDC 消息的有效负载部分。
- 您需要将此添加到您的 Worker.properties:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
您应该修改您的 Debezeium 连接器以具有 value.converter.schemas.enabled=false
,然后您将只有 payload
字段可以使用。
否则,您可以为整个对象创建 class/schema 以及 from_json()
函数,或者将值保留为字符串并使用 get_json_object()
Spark 函数来解析数据
也相关 - 您可能想要提取 NewRecordState