如何将 Spark 数据帧写入 Kinesis Stream?
How to write a Spark dataframe into Kinesis Stream?
我正在使用 spark 流从 kafka 主题创建一个 Dataframe。
我想将 Dataframe 写入 Kinesis Producer。
据我所知,目前还没有官方的 API。但是互联网上有多个 API 可用,但遗憾的是,其中 none 对我有用。
星火版本:2.2
斯卡拉:2.11
我尝试使用 https://github.com/awslabs/kinesis-kafka-connector 构建 jar。但是由于这个 jar 和 spark API 之间的包名冲突而出现错误。请帮忙。
########### 这里是别人的代码:
spark-shell --jars spark-sql-kinesis_2.11-2.2.0.jar,spark-sql-kafka-0-10_2.11-2.1.0.jar,spark-streaming-kafka-0-10-assembly_2.10-2.1.0.jar --files kafka_client_jaas_spark.conf --properties-file gobblin_migration.conf --conf spark.port.maxRetries=100 --driver-java-options "-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf"
import java.io.File
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import scala.sys.process._
import org.apache.log4j.{ Logger, Level, LogManager, PropertyConfigurator }
import org.apache.spark.sql.streaming.Trigger
val streamingInputDF =spark.readStream.format("kafka").option("kafka.bootstrap.servers","bootstrap server").option("subscribe", "<kafkatopic>").option("startingOffsets", "latest").option("failOnDataLoss", "false").option("kafka.security.protocol", "SASL_PLAINTEXT").load()
val xdf=streamingInputDF.select(col("partition").cast("String").alias("partitionKey"),col("value").alias("data"))
xdf.writeStream.format("kinesis").option("checkpointLocation", "<hdfspath>").outputMode("Append").option("streamName", "kinesisstreamname").option("endpointUrl","kinesisendpoint").option("awsAccessKeyId", "accesskey").option("awsSecretKey","secretkey").start().awaitTermination()
对于 jar spark-sql-kinesis_2.11-2.2.0.jar,转到 quoble,下载适合您的 spark 版本的包,构建 jar。
如果您在公司网络后面,请在启动 spark 之前设置代理。
导出 http_proxy=http://server-ip:port/
导出 https_proxy=https://server-ip:port/
Kafka Connect 是一项服务,您可以 POST 您的连接器规格(在本例中为 kinesis),然后它负责 运行 连接器。它在处理记录时也支持很多转换。 Kafka Connect 插件不适用于 Spark 应用程序。
如果您的用例要求您在处理记录时执行一些业务逻辑,那么您可以选择 Spark Streaming 或 Structured Streaming 方法。
如果您想采用基于 Spark 的方法,以下是我能想到的 2 个选项。
使用结构化流。您可以为 Kinesis 使用 Strucuted 流连接器。你可以找到一个 here. There may be others too. This is the only stable and open source connector I am aware of. You can find an example for using Kinesis as a sink here.
使用 Kinesis Producer Library or aws-java-sdk-kinesis 库从您的 Spark Streaming 应用程序发布记录。此处使用 KPL 是首选方法。您可以 mapPartitions
并为每个分区创建一个 Kinesis 客户端,并使用这些库发布记录。 AWS 文档中有大量关于这两个库的示例。
我正在使用 spark 流从 kafka 主题创建一个 Dataframe。 我想将 Dataframe 写入 Kinesis Producer。 据我所知,目前还没有官方的 API。但是互联网上有多个 API 可用,但遗憾的是,其中 none 对我有用。 星火版本:2.2 斯卡拉:2.11
我尝试使用 https://github.com/awslabs/kinesis-kafka-connector 构建 jar。但是由于这个 jar 和 spark API 之间的包名冲突而出现错误。请帮忙。
########### 这里是别人的代码:spark-shell --jars spark-sql-kinesis_2.11-2.2.0.jar,spark-sql-kafka-0-10_2.11-2.1.0.jar,spark-streaming-kafka-0-10-assembly_2.10-2.1.0.jar --files kafka_client_jaas_spark.conf --properties-file gobblin_migration.conf --conf spark.port.maxRetries=100 --driver-java-options "-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf"
import java.io.File
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import scala.sys.process._
import org.apache.log4j.{ Logger, Level, LogManager, PropertyConfigurator }
import org.apache.spark.sql.streaming.Trigger
val streamingInputDF =spark.readStream.format("kafka").option("kafka.bootstrap.servers","bootstrap server").option("subscribe", "<kafkatopic>").option("startingOffsets", "latest").option("failOnDataLoss", "false").option("kafka.security.protocol", "SASL_PLAINTEXT").load()
val xdf=streamingInputDF.select(col("partition").cast("String").alias("partitionKey"),col("value").alias("data"))
xdf.writeStream.format("kinesis").option("checkpointLocation", "<hdfspath>").outputMode("Append").option("streamName", "kinesisstreamname").option("endpointUrl","kinesisendpoint").option("awsAccessKeyId", "accesskey").option("awsSecretKey","secretkey").start().awaitTermination()
对于 jar spark-sql-kinesis_2.11-2.2.0.jar,转到 quoble,下载适合您的 spark 版本的包,构建 jar。
如果您在公司网络后面,请在启动 spark 之前设置代理。 导出 http_proxy=http://server-ip:port/ 导出 https_proxy=https://server-ip:port/
Kafka Connect 是一项服务,您可以 POST 您的连接器规格(在本例中为 kinesis),然后它负责 运行 连接器。它在处理记录时也支持很多转换。 Kafka Connect 插件不适用于 Spark 应用程序。
如果您的用例要求您在处理记录时执行一些业务逻辑,那么您可以选择 Spark Streaming 或 Structured Streaming 方法。
如果您想采用基于 Spark 的方法,以下是我能想到的 2 个选项。
使用结构化流。您可以为 Kinesis 使用 Strucuted 流连接器。你可以找到一个 here. There may be others too. This is the only stable and open source connector I am aware of. You can find an example for using Kinesis as a sink here.
使用 Kinesis Producer Library or aws-java-sdk-kinesis 库从您的 Spark Streaming 应用程序发布记录。此处使用 KPL 是首选方法。您可以
mapPartitions
并为每个分区创建一个 Kinesis 客户端,并使用这些库发布记录。 AWS 文档中有大量关于这两个库的示例。