如何在 Kafka 源的流式查询中处理 CSV 格式的值?
How to process values in CSV format in streaming queries over Kafka source?
我是结构化流媒体的新手,我想知道是否有一种方法可以像我们在普通结构化流媒体作业中所做的那样指定 Kafka 值 schema
。 Kafka value 中的格式是 50+ fields syslog-like csv,手动拆分非常慢。
这是我的代码的简短部分 (see full gist here)
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "myserver:9092")
.option("subscribe", "mytopic")
.load()
.select(split('value, """\^""") as "raw")
.select(ColumnExplode('raw, schema.size): _*) // flatten WrappedArray
.toDF(schema.fieldNames: _*) // apply column names
.select(fieldsWithTypeFix: _*) // cast column types from string
.select(schema.fieldNames.map(col): _*) // re-order columns, as defined in schema
.writeStream.format("console").start()
在没有进一步操作的情况下,我只能在 24 核 128GB 内存服务器上实现大约 10MB/s 的吞吐量。如果我事先将系统日志转换为 JSON 会有帮助吗?在那种情况下,我可以使用 from_json
和 schema
,也许它会更快。
is there a way to specify Kafka value's schema like what we do in normal structured streaming jobs.
没有。 kafka 外部数据源的所谓输出模式是固定的,永远无法更改。参见 this line。
Would it help if I convert the syslog to JSON in prior? In that case I can use from_json with schema, and maybe it will be faster.
我不这么认为。我什至会说 CSV 是一种比 JSON 更简单的文本格式(因为通常只有一个分隔符)。
使用 split
标准函数是可行的方法,并且认为您很难获得更好的性能,因为它会拆分行并获取每个元素来构建最终输出。
我是结构化流媒体的新手,我想知道是否有一种方法可以像我们在普通结构化流媒体作业中所做的那样指定 Kafka 值 schema
。 Kafka value 中的格式是 50+ fields syslog-like csv,手动拆分非常慢。
这是我的代码的简短部分 (see full gist here)
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "myserver:9092")
.option("subscribe", "mytopic")
.load()
.select(split('value, """\^""") as "raw")
.select(ColumnExplode('raw, schema.size): _*) // flatten WrappedArray
.toDF(schema.fieldNames: _*) // apply column names
.select(fieldsWithTypeFix: _*) // cast column types from string
.select(schema.fieldNames.map(col): _*) // re-order columns, as defined in schema
.writeStream.format("console").start()
在没有进一步操作的情况下,我只能在 24 核 128GB 内存服务器上实现大约 10MB/s 的吞吐量。如果我事先将系统日志转换为 JSON 会有帮助吗?在那种情况下,我可以使用 from_json
和 schema
,也许它会更快。
is there a way to specify Kafka value's schema like what we do in normal structured streaming jobs.
没有。 kafka 外部数据源的所谓输出模式是固定的,永远无法更改。参见 this line。
Would it help if I convert the syslog to JSON in prior? In that case I can use from_json with schema, and maybe it will be faster.
我不这么认为。我什至会说 CSV 是一种比 JSON 更简单的文本格式(因为通常只有一个分隔符)。
使用 split
标准函数是可行的方法,并且认为您很难获得更好的性能,因为它会拆分行并获取每个元素来构建最终输出。