从 Kafka 流式传输后对列值应用函数

Apply function on column values after streaming from Kafka

我需要在从 Kafka 主题读取流之后并在将其写入任何登陆或 table 之前在某些列上应用函数。

这是在 azure databricks 中完成的。

CREATE FUNCTION encrypt AS 'com.encrypt.EncryptJava' using JAR 'hdfs:/.../jars/encryption_1.0.0.jar';

select encrypt(123,'key');

var streamingSelectDF = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootStrapServers)
    .option("subscribe", topicName)     
    .option("startingOffsets", "earliest")  
    .load()
 .selectExpr("CAST(value AS STRING)").withColumn("jsonData",from_json($"value",schema)).select($"jsonData.*")

上面的代码创建了一个函数,从kafka流中读取json数据,并将其分解成多列。

接下来需要在几个列上应用上面的函数,并在将其保存到登陆或table之前对其进行转换。

我在尝试时遇到了各种不同的错误。

streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
.option("mergeschema",true)
 .option("checkpointLocation", checkPointPath)
.format("delta")
.trigger(Trigger.ProcessingTime("5 seconds"))
.table("raw_data")

command-3006790186109139:29: error: not found: value encrypt
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")

   

任何人都可以帮助我实现同样的目标。

解决了
.withColumn("encrypted_acctnum",expr("encrypt(acctnum, 'b1')"))