从 Kafka 流式传输后对列值应用函数
Apply function on column values after streaming from Kafka
我需要在从 Kafka 主题读取流之后并在将其写入任何登陆或 table 之前在某些列上应用函数。
这是在 azure databricks 中完成的。
CREATE FUNCTION encrypt AS 'com.encrypt.EncryptJava' using JAR 'hdfs:/.../jars/encryption_1.0.0.jar';
select encrypt(123,'key');
var streamingSelectDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootStrapServers)
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)").withColumn("jsonData",from_json($"value",schema)).select($"jsonData.*")
上面的代码创建了一个函数,从kafka流中读取json数据,并将其分解成多列。
接下来需要在几个列上应用上面的函数,并在将其保存到登陆或table之前对其进行转换。
我在尝试时遇到了各种不同的错误。
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
.option("mergeschema",true)
.option("checkpointLocation", checkPointPath)
.format("delta")
.trigger(Trigger.ProcessingTime("5 seconds"))
.table("raw_data")
command-3006790186109139:29: error: not found: value encrypt
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
任何人都可以帮助我实现同样的目标。
用
解决了
.withColumn("encrypted_acctnum",expr("encrypt(acctnum, 'b1')"))
我需要在从 Kafka 主题读取流之后并在将其写入任何登陆或 table 之前在某些列上应用函数。
这是在 azure databricks 中完成的。
CREATE FUNCTION encrypt AS 'com.encrypt.EncryptJava' using JAR 'hdfs:/.../jars/encryption_1.0.0.jar';
select encrypt(123,'key');
var streamingSelectDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootStrapServers)
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)").withColumn("jsonData",from_json($"value",schema)).select($"jsonData.*")
上面的代码创建了一个函数,从kafka流中读取json数据,并将其分解成多列。
接下来需要在几个列上应用上面的函数,并在将其保存到登陆或table之前对其进行转换。
我在尝试时遇到了各种不同的错误。
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
.option("mergeschema",true)
.option("checkpointLocation", checkPointPath)
.format("delta")
.trigger(Trigger.ProcessingTime("5 seconds"))
.table("raw_data")
command-3006790186109139:29: error: not found: value encrypt
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
任何人都可以帮助我实现同样的目标。
用
解决了.withColumn("encrypted_acctnum",expr("encrypt(acctnum, 'b1')"))