如何将 debezium 消息转换为 JSON 格式,以便可以将其加载到 Redshift 中
How to transform a debezium message in JSON format such that it can be loaded into Redshift
我需要帮助来完成一些事情。我创建了一个数据管道,如下所述。
Mysql-->debezium--> Kafka-->Kafka Connect--->AWS S3.
现在 S3 将有一个 JSON 格式的 debezium 事件消息。
现在需要将其作为 table.
加载到 Redshift
S3-->Redshift(目标数据库)作为行。
下面我分享了一个针对单个更新事件的 debezium 事件消息(product_id 102 的更新数量)并且只想采用一种格式,当我在 S3 中执行复制命令时,它应该加载更改(Create/Update/Delete) 为红移 table.
注意:我在这里给出了 "rotate.interval.ms":“3600000” 我们将创建一个包含所有 CRUD 操作的 debezium 消息文件。
因此需要一个解决方案,以便将 S3 中每个新创建的文件(作为 debezium 消息事件)转换为我们可以应用复制命令的格式,以便它在 redshift 中加载。
我的主要目标是从 MYSQL 捕获 CDC 更改并在 Redshift 中复制。
这是我的 S3 接收器连接器配置:
Kafka Connect S3 接收器:
{
"name": "s3-sink-db02",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "S3bucket",
"name": "s3-sink-db02",
"tasks.max": "3",
"s3.region": "us-east-1",
"aws.access_key_id": "accesskey",
"aws.secret_access_key": "secretKey",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"locale": "en",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"topics.regex": "dbserver1.(.*)",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"rotate.schedule.interval.ms": "3600000"
}
}
Debezium 消息:
{
"schema": {
"name": "dbserver1.inventory.orders.Envelope",
"optional": false,
"type": "struct",
"fields": [
{
"field": "before",
"name": "dbserver1.inventory.orders.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "order_number",
"optional": false,
"type": "int32"
},
{
"field": "order_date",
"name": "io.debezium.time.Date",
"optional": false,
"type": "int32",
"version": 1
},
{
"field": "purchaser",
"optional": false,
"type": "int32"
},
{
"field": "quantity",
"optional": false,
"type": "int32"
},
{
"field": "product_id",
"optional": false,
"type": "int32"
}
]
},
{
"field": "after",
"name": "dbserver1.inventory.orders.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "order_number",
"optional": false,
"type": "int32"
},
{
"field": "order_date",
"name": "io.debezium.time.Date",
"optional": false,
"type": "int32",
"version": 1
},
{
"field": "purchaser",
"optional": false,
"type": "int32"
},
{
"field": "quantity",
"optional": false,
"type": "int32"
},
{
"field": "product_id",
"optional": false,
"type": "int32"
}
]
},
{
"field": "source",
"name": "io.debezium.connector.mysql.Source",
"optional": false,
"type": "struct",
"fields": [
{
"field": "version",
"optional": false,
"type": "string"
},
{
"field": "connector",
"optional": false,
"type": "string"
},
{
"field": "name",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": false,
"type": "int64"
},
{
"default": "false",
"field": "snapshot",
"name": "io.debezium.data.Enum",
"optional": true,
"type": "string",
"version": 1,
"parameters": {
"allowed": "true,last,false"
}
},
{
"field": "db",
"optional": false,
"type": "string"
},
{
"field": "table",
"optional": true,
"type": "string"
},
{
"field": "server_id",
"optional": false,
"type": "int64"
},
{
"field": "gtid",
"optional": true,
"type": "string"
},
{
"field": "file",
"optional": false,
"type": "string"
},
{
"field": "pos",
"optional": false,
"type": "int64"
},
{
"field": "row",
"optional": false,
"type": "int32"
},
{
"field": "thread",
"optional": true,
"type": "int64"
},
{
"field": "query",
"optional": true,
"type": "string"
}
]
},
{
"field": "op",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": true,
"type": "int64"
}
]
},
"payload": {
"op": "u",
"before": {
"order_date": 16816,
"quantity": 1,
"purchaser": 1001,
"order_number": 10001,
"product_id": 102
},
"after": **{
"order_date": 16816,
"quantity": 6,
"purchaser": 1001,
"order_number": 10001,
"product_id": 102
},
"source": {
"query": null,
"thread": 4,
"server_id": 223344,
"version": "1.0.3.Final",
"file": "mysql-bin.000007",
"connector": "mysql",
"pos": 354,
"name": "dbserver1",
"gtid": null,
"row": 0,
"ts_ms": 1591620600000,
"snapshot": "false",
"db": "inventory",
"table": "orders"
},
"ts_ms": 1591620602204
}
我会在有空的时候完善这个答案。它来自我们的一个生产回购协议,它使用 https://github.com/goibibo/dataplatform_utils 库。
import com.goibibo.dp.utils.{KfUtils09, SparkKafkaUtils09}
import com.goibibo.dp.utils.SparkKafkaUtils09._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, OffsetRange}
import org.slf4j.{Logger, LoggerFactory}
import org.json4s.jackson.Serialization
import org.json4s._
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConverters._
object KafkaToRedShift {
val spark: SparkSession = getSparkSession("KafkaToRedShift") /* Implement this */
val logger: Logger = LoggerFactory.getLogger(KafkaToRedShift.getClass)
def createOffsetRange(kafkaBrokers: String, topics: Seq[String],
consumerGroup: String, maxMessagesPerPartition: Option[Int],
readFrom: String = READ_FROM_COMMITTED): (Seq[OffsetRange], KOffsets, Boolean) = {
var isReadRequired = false
val kafkaConfig = KfUtils09.createKafkaConfig(kafkaBrokers, consumerGroup)
val topicsNames = topics.asJava
val earliestOffsets: Map[TopicPartition, Long] = KfUtils09.getEarliestOffsets(topicsNames, kafkaConfig).get
val latestOffsets: Map[TopicPartition, Long] = KfUtils09.getLatestOffsets(topicsNames, kafkaConfig).get
val committedOffsets: Map[TopicPartition, Long] = KfUtils09.getCommittedOffsets(topicsNames, kafkaConfig).get
val fromOffsets =
if (READ_FROM_EARLIEST.equals(readFrom)) earliestOffsets
else if (READ_FROM_LATEST.equals(readFrom)) latestOffsets
else committedOffsets
val offsetRanges: List[OffsetRange] = latestOffsets.toList.map(pairTopicPartitionAndOffset => {
val (tp, untilOffset) = pairTopicPartitionAndOffset
val totalMessagesInPartition = untilOffset - fromOffsets(tp)
logger.info(s"${tp.topic} ${tp.partition} earliestOffsets = $earliestOffsets committedOffsets = ${committedOffsets(tp)} fromOffsets = ${fromOffsets(tp)} untilOffset = $untilOffset")
logger.info(s"${tp.topic} ${tp.partition} totalMessagesInPartition = $totalMessagesInPartition ")
val newUntilOffset = if (maxMessagesPerPartition.isDefined) {
if (totalMessagesInPartition > maxMessagesPerPartition.get) {
logger.info(s"${tp.topic} ${tp.partition} totalMessagesInPartition = $totalMessagesInPartition higher than maxMessagesPerPartition = $maxMessagesPerPartition")
val newUntilOffset = fromOffsets(tp) + maxMessagesPerPartition.get
logger.info(s"${tp.topic} ${tp.partition} new untilOffset = $newUntilOffset")
newUntilOffset
} else {
untilOffset
}
} else {
untilOffset
}
if (newUntilOffset > fromOffsets(tp)) {
isReadRequired = true
}
OffsetRange.create(tp.topic, tp.partition, fromOffsets(tp), newUntilOffset)
})
val latestOffsetsN = offsetRanges.map(o => (new TopicPartition(o.topic, o.partition), o.untilOffset)).toMap
(offsetRanges, latestOffsetsN, isReadRequired)
}
def main(args: Array[String]): Unit = {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
val kafkaBrokers: String = ConfigUtils.bootstrapServers
val topics: Seq[String] = Seq(ConfigUtils.readTopic)
val consumerGroup: String = ConfigUtils.kShiftGroup
if (args.isEmpty) {
throw new IllegalArgumentException("please provide filename")
}
val fileName = args.head
logger.info(s"Found file name in argument: $fileName")
val configStr = readFromFile(fileName) /* Implement this */
val conf: MySqlConfig = Serialization.read[DatabaseConfig](configStr)
val sql: String = conf.sql.get
val fullTableName = conf.tableName.split('.')
val tableSchema = fullTableName.head
val table: String = fullTableName.tail.head
val source = conf.source
var batchpush = false
val lockFileName = s"${topics}_file.lck"
try {
acquireLock(lockFileName) /* Implement this */
implicit val formats = Serialization.formats(NoTypeHints)
val (offsets, latestOffsets, isReadRequired) = createOffsetRange(kafkaBrokers, topics, consumerGroup, None)
if (isReadRequired) {
val fromOffset: Map[String, Map[String, Long]] = offsets.map { o =>
(o.topic, o.topicPartition.partition(), o.fromOffset)
}.groupBy(_._1).mapValues(value => value.map(t => (t._2.toString, t._3)).toMap)
if(fromOffset.forall(o=> o._2.forall(t=> t._2==0))){
batchpush = true
}
val fromOffsetStr: String = write(fromOffset)
val toOffset: Map[String, Map[String, Long]] = offsets.map { o =>
(o.topic, o.topicPartition.partition(), o.untilOffset)
}.groupBy(_._1).mapValues(value => value.map(t => (t._2.toString, t._3)).toMap)
val toOffsetStr: String = write(toOffset)
val df: DataFrame = spark.
read.
format("kafka").
option("kafka.bootstrap.servers", kafkaBrokers).
option("subscribe", topics.mkString(",")).
option("startingOffsets", fromOffsetStr).
option("endingOffsets", toOffsetStr).
load().cache
df.createOrReplaceTempView("raw_data")
val tfm = if(source == "mysql") {
logger.info(s"Execute data transformation query: $sql")
spark.sql(sql)
} else {
spark.sql(
"""
with a as(
select cast(key as string) key, cast(value as string) value,
timestamp as ingestion_time,topic,partition,offset
from raw_data
),
b as
(
select *,row_number() over(partition by key order by topic,partition,offset desc ) r from a
)
select * from b where r = 1
""").drop("r").createOrReplaceTempView("dedup_data")
// create or replace temp view casted_data as
logger.info(s"Execute data transformation query: $sql")
spark.sql(sql)
}
val columnNames = tfm.schema.map(_.name).map(c => s""" "$c" """).mkString(",")
tfm.createOrReplaceTempView("tfm")
val spectrumTableName = s"misc.${table}_realtime_tmp"
spark.sql(s"drop table if exists $spectrumTableName")
spark.table("tfm").repartition(3).write.format("parquet").mode("overwrite").saveAsTable(spectrumTableName)
val redshiftSql: String =
s"""
|create temp table ${table}_realtime_staging
| distkey(key)
| sortkey(key)
| as
| select * from misc_e.${table}_realtime_tmp;
|
| CREATE TABLE if not exists $tableSchema.${table}_realtime (like ${table}_realtime_staging);
|
| delete from $tableSchema.${table}_realtime
| where key in (select key from ${table}_realtime_staging where key is not null);
|
| insert into $tableSchema.${table}_realtime($columnNames)
| select $columnNames from ${table}_realtime_staging where raw_json != 'Deleted';
|
| select 1;
""".stripMargin
logger.info(s"Excecuting Below Query in Redshift: $redshiftSql")
RedshiftUtils.executeRedshiftSql(redshiftSql) /* Implement this */
if (!SparkKafkaUtils09.commitOffsets(kafkaBrokers, consumerGroup, latestOffsets)) {
logger.warn("Commit Offsets failed")
} else {
logger.info("Commit Offsets successful!")
}
}
} catch {
case e: Exception => {
val stackTrace = org.apache.commons.lang.exception.ExceptionUtils.getStackTrace(e)
logger.info(s"Releasing lock due to exception.. $stackTrace")
releaseLock(lockFileName) /* Implement this */
throw new Exception("exiting due to exception..", e)
}
} finally {
//release the lock
releaseLock(lockFileName) /* Implement this */
}
logger.info("All Done!!!")
System.exit(0)
}
}
我需要帮助来完成一些事情。我创建了一个数据管道,如下所述。
Mysql-->debezium--> Kafka-->Kafka Connect--->AWS S3.
现在 S3 将有一个 JSON 格式的 debezium 事件消息。
现在需要将其作为 table.
加载到 RedshiftS3-->Redshift(目标数据库)作为行。
下面我分享了一个针对单个更新事件的 debezium 事件消息(product_id 102 的更新数量)并且只想采用一种格式,当我在 S3 中执行复制命令时,它应该加载更改(Create/Update/Delete) 为红移 table.
注意:我在这里给出了 "rotate.interval.ms":“3600000” 我们将创建一个包含所有 CRUD 操作的 debezium 消息文件。
因此需要一个解决方案,以便将 S3 中每个新创建的文件(作为 debezium 消息事件)转换为我们可以应用复制命令的格式,以便它在 redshift 中加载。 我的主要目标是从 MYSQL 捕获 CDC 更改并在 Redshift 中复制。
这是我的 S3 接收器连接器配置: Kafka Connect S3 接收器:
{
"name": "s3-sink-db02",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "S3bucket",
"name": "s3-sink-db02",
"tasks.max": "3",
"s3.region": "us-east-1",
"aws.access_key_id": "accesskey",
"aws.secret_access_key": "secretKey",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"locale": "en",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"topics.regex": "dbserver1.(.*)",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"rotate.schedule.interval.ms": "3600000"
}
}
Debezium 消息:
{
"schema": {
"name": "dbserver1.inventory.orders.Envelope",
"optional": false,
"type": "struct",
"fields": [
{
"field": "before",
"name": "dbserver1.inventory.orders.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "order_number",
"optional": false,
"type": "int32"
},
{
"field": "order_date",
"name": "io.debezium.time.Date",
"optional": false,
"type": "int32",
"version": 1
},
{
"field": "purchaser",
"optional": false,
"type": "int32"
},
{
"field": "quantity",
"optional": false,
"type": "int32"
},
{
"field": "product_id",
"optional": false,
"type": "int32"
}
]
},
{
"field": "after",
"name": "dbserver1.inventory.orders.Value",
"optional": true,
"type": "struct",
"fields": [
{
"field": "order_number",
"optional": false,
"type": "int32"
},
{
"field": "order_date",
"name": "io.debezium.time.Date",
"optional": false,
"type": "int32",
"version": 1
},
{
"field": "purchaser",
"optional": false,
"type": "int32"
},
{
"field": "quantity",
"optional": false,
"type": "int32"
},
{
"field": "product_id",
"optional": false,
"type": "int32"
}
]
},
{
"field": "source",
"name": "io.debezium.connector.mysql.Source",
"optional": false,
"type": "struct",
"fields": [
{
"field": "version",
"optional": false,
"type": "string"
},
{
"field": "connector",
"optional": false,
"type": "string"
},
{
"field": "name",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": false,
"type": "int64"
},
{
"default": "false",
"field": "snapshot",
"name": "io.debezium.data.Enum",
"optional": true,
"type": "string",
"version": 1,
"parameters": {
"allowed": "true,last,false"
}
},
{
"field": "db",
"optional": false,
"type": "string"
},
{
"field": "table",
"optional": true,
"type": "string"
},
{
"field": "server_id",
"optional": false,
"type": "int64"
},
{
"field": "gtid",
"optional": true,
"type": "string"
},
{
"field": "file",
"optional": false,
"type": "string"
},
{
"field": "pos",
"optional": false,
"type": "int64"
},
{
"field": "row",
"optional": false,
"type": "int32"
},
{
"field": "thread",
"optional": true,
"type": "int64"
},
{
"field": "query",
"optional": true,
"type": "string"
}
]
},
{
"field": "op",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": true,
"type": "int64"
}
]
},
"payload": {
"op": "u",
"before": {
"order_date": 16816,
"quantity": 1,
"purchaser": 1001,
"order_number": 10001,
"product_id": 102
},
"after": **{
"order_date": 16816,
"quantity": 6,
"purchaser": 1001,
"order_number": 10001,
"product_id": 102
},
"source": {
"query": null,
"thread": 4,
"server_id": 223344,
"version": "1.0.3.Final",
"file": "mysql-bin.000007",
"connector": "mysql",
"pos": 354,
"name": "dbserver1",
"gtid": null,
"row": 0,
"ts_ms": 1591620600000,
"snapshot": "false",
"db": "inventory",
"table": "orders"
},
"ts_ms": 1591620602204
}
我会在有空的时候完善这个答案。它来自我们的一个生产回购协议,它使用 https://github.com/goibibo/dataplatform_utils 库。
import com.goibibo.dp.utils.{KfUtils09, SparkKafkaUtils09}
import com.goibibo.dp.utils.SparkKafkaUtils09._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, OffsetRange}
import org.slf4j.{Logger, LoggerFactory}
import org.json4s.jackson.Serialization
import org.json4s._
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConverters._
object KafkaToRedShift {
val spark: SparkSession = getSparkSession("KafkaToRedShift") /* Implement this */
val logger: Logger = LoggerFactory.getLogger(KafkaToRedShift.getClass)
def createOffsetRange(kafkaBrokers: String, topics: Seq[String],
consumerGroup: String, maxMessagesPerPartition: Option[Int],
readFrom: String = READ_FROM_COMMITTED): (Seq[OffsetRange], KOffsets, Boolean) = {
var isReadRequired = false
val kafkaConfig = KfUtils09.createKafkaConfig(kafkaBrokers, consumerGroup)
val topicsNames = topics.asJava
val earliestOffsets: Map[TopicPartition, Long] = KfUtils09.getEarliestOffsets(topicsNames, kafkaConfig).get
val latestOffsets: Map[TopicPartition, Long] = KfUtils09.getLatestOffsets(topicsNames, kafkaConfig).get
val committedOffsets: Map[TopicPartition, Long] = KfUtils09.getCommittedOffsets(topicsNames, kafkaConfig).get
val fromOffsets =
if (READ_FROM_EARLIEST.equals(readFrom)) earliestOffsets
else if (READ_FROM_LATEST.equals(readFrom)) latestOffsets
else committedOffsets
val offsetRanges: List[OffsetRange] = latestOffsets.toList.map(pairTopicPartitionAndOffset => {
val (tp, untilOffset) = pairTopicPartitionAndOffset
val totalMessagesInPartition = untilOffset - fromOffsets(tp)
logger.info(s"${tp.topic} ${tp.partition} earliestOffsets = $earliestOffsets committedOffsets = ${committedOffsets(tp)} fromOffsets = ${fromOffsets(tp)} untilOffset = $untilOffset")
logger.info(s"${tp.topic} ${tp.partition} totalMessagesInPartition = $totalMessagesInPartition ")
val newUntilOffset = if (maxMessagesPerPartition.isDefined) {
if (totalMessagesInPartition > maxMessagesPerPartition.get) {
logger.info(s"${tp.topic} ${tp.partition} totalMessagesInPartition = $totalMessagesInPartition higher than maxMessagesPerPartition = $maxMessagesPerPartition")
val newUntilOffset = fromOffsets(tp) + maxMessagesPerPartition.get
logger.info(s"${tp.topic} ${tp.partition} new untilOffset = $newUntilOffset")
newUntilOffset
} else {
untilOffset
}
} else {
untilOffset
}
if (newUntilOffset > fromOffsets(tp)) {
isReadRequired = true
}
OffsetRange.create(tp.topic, tp.partition, fromOffsets(tp), newUntilOffset)
})
val latestOffsetsN = offsetRanges.map(o => (new TopicPartition(o.topic, o.partition), o.untilOffset)).toMap
(offsetRanges, latestOffsetsN, isReadRequired)
}
def main(args: Array[String]): Unit = {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
val kafkaBrokers: String = ConfigUtils.bootstrapServers
val topics: Seq[String] = Seq(ConfigUtils.readTopic)
val consumerGroup: String = ConfigUtils.kShiftGroup
if (args.isEmpty) {
throw new IllegalArgumentException("please provide filename")
}
val fileName = args.head
logger.info(s"Found file name in argument: $fileName")
val configStr = readFromFile(fileName) /* Implement this */
val conf: MySqlConfig = Serialization.read[DatabaseConfig](configStr)
val sql: String = conf.sql.get
val fullTableName = conf.tableName.split('.')
val tableSchema = fullTableName.head
val table: String = fullTableName.tail.head
val source = conf.source
var batchpush = false
val lockFileName = s"${topics}_file.lck"
try {
acquireLock(lockFileName) /* Implement this */
implicit val formats = Serialization.formats(NoTypeHints)
val (offsets, latestOffsets, isReadRequired) = createOffsetRange(kafkaBrokers, topics, consumerGroup, None)
if (isReadRequired) {
val fromOffset: Map[String, Map[String, Long]] = offsets.map { o =>
(o.topic, o.topicPartition.partition(), o.fromOffset)
}.groupBy(_._1).mapValues(value => value.map(t => (t._2.toString, t._3)).toMap)
if(fromOffset.forall(o=> o._2.forall(t=> t._2==0))){
batchpush = true
}
val fromOffsetStr: String = write(fromOffset)
val toOffset: Map[String, Map[String, Long]] = offsets.map { o =>
(o.topic, o.topicPartition.partition(), o.untilOffset)
}.groupBy(_._1).mapValues(value => value.map(t => (t._2.toString, t._3)).toMap)
val toOffsetStr: String = write(toOffset)
val df: DataFrame = spark.
read.
format("kafka").
option("kafka.bootstrap.servers", kafkaBrokers).
option("subscribe", topics.mkString(",")).
option("startingOffsets", fromOffsetStr).
option("endingOffsets", toOffsetStr).
load().cache
df.createOrReplaceTempView("raw_data")
val tfm = if(source == "mysql") {
logger.info(s"Execute data transformation query: $sql")
spark.sql(sql)
} else {
spark.sql(
"""
with a as(
select cast(key as string) key, cast(value as string) value,
timestamp as ingestion_time,topic,partition,offset
from raw_data
),
b as
(
select *,row_number() over(partition by key order by topic,partition,offset desc ) r from a
)
select * from b where r = 1
""").drop("r").createOrReplaceTempView("dedup_data")
// create or replace temp view casted_data as
logger.info(s"Execute data transformation query: $sql")
spark.sql(sql)
}
val columnNames = tfm.schema.map(_.name).map(c => s""" "$c" """).mkString(",")
tfm.createOrReplaceTempView("tfm")
val spectrumTableName = s"misc.${table}_realtime_tmp"
spark.sql(s"drop table if exists $spectrumTableName")
spark.table("tfm").repartition(3).write.format("parquet").mode("overwrite").saveAsTable(spectrumTableName)
val redshiftSql: String =
s"""
|create temp table ${table}_realtime_staging
| distkey(key)
| sortkey(key)
| as
| select * from misc_e.${table}_realtime_tmp;
|
| CREATE TABLE if not exists $tableSchema.${table}_realtime (like ${table}_realtime_staging);
|
| delete from $tableSchema.${table}_realtime
| where key in (select key from ${table}_realtime_staging where key is not null);
|
| insert into $tableSchema.${table}_realtime($columnNames)
| select $columnNames from ${table}_realtime_staging where raw_json != 'Deleted';
|
| select 1;
""".stripMargin
logger.info(s"Excecuting Below Query in Redshift: $redshiftSql")
RedshiftUtils.executeRedshiftSql(redshiftSql) /* Implement this */
if (!SparkKafkaUtils09.commitOffsets(kafkaBrokers, consumerGroup, latestOffsets)) {
logger.warn("Commit Offsets failed")
} else {
logger.info("Commit Offsets successful!")
}
}
} catch {
case e: Exception => {
val stackTrace = org.apache.commons.lang.exception.ExceptionUtils.getStackTrace(e)
logger.info(s"Releasing lock due to exception.. $stackTrace")
releaseLock(lockFileName) /* Implement this */
throw new Exception("exiting due to exception..", e)
}
} finally {
//release the lock
releaseLock(lockFileName) /* Implement this */
}
logger.info("All Done!!!")
System.exit(0)
}
}