根据嵌套 ID 的列表乘以事件大小写 class

Multiplying event case class depending on the list based on nested IDs

我正在处理一个数据帧并使用事件案例 class.How 转换为数据集 [事件] 曾经有嵌套的 ID,我需要根据嵌套 device:os 的扁平化来乘以事件。

我能够 return Kafka 事件级别的案例 class 事件。但不确定如何乘以事件。

Kafka 传入事件:

{
  "partition": 1,
  "key": "34768_20220203_MFETP501",
  "offset": 1841543,
  "createTime": 1646041475348,
  "topic": "topic_int",
  "publishTime": 1646041475344,
  "errorCode": 0,
  "userActions": {
    "productId": "3MFETP501",
    "createdDate": "2022-02-26T11:19:35.786Z",
    "events": [
      {
        "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-02-26T11:19:35.786Z",
        "familyId": 2010,
        "productTypeId": 1004678,
        "serialID": "890479804",
        "productName": "MFE Total Protection 2021 Family Pack",
        "features": {
          "mapping": [
            {
              "deviceId": 999795,
              "osId": [
                100
              ]
            },
            {
              "deviceId": 987875
              "osId": [
                101
              ]
            }
          ]
        }
      }
    ]
  }
}

事件

的预期输出案例classes
Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","999795_100", Map("targetId"->"999795_100") )
Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","987875_100", Map("targetId"->"987875_100") )
case class Event(
                    productId: String,
                    familyId: String,
                    productTypeId: String,
                    key: String,
                    productName: String,
                    deviceOS:String, 
                    var featureMap: mutable.Map[String, String])




val finalDataset:Dataset[Event] = inputDataFrame.flatMap(
row=> {

        val productId = row.getAs[String]("productId")
        val userActions = row.getAs[Row]("userActions")
        val userEvents:mutable.Seq[Row] = userActions.getAs[mutable.WrappedArray[Row]]("events")

        val processedEvents:mutable.Seq[Row]= userEvents.map(
          event=> 

            val productTypeId = event.getAs[Int]("productTypeId")
            val familyId = event.getAs[String]("familyId")
            val features = activity.getAs[mutable.WrappedArray[Row]]("features")
            val serialId = activity.getAs[String]("serialId")
            val key =  productId+":"+serialId
            val features = mutable.Map[String, String]().withDefaultValue(null)
            

            val device_os_list=List("999795_100","987875_101")
            //Feature Map is for every device_os ( example "targetId"->"999795_100") for 999795_100

      if (familyId == 2010 )
    {
      val a: Option[List[String]] = flatten the deviceId,osId ..
      
      a.get.map(i=>{
          val key: String =  methodToCombinedeviceIdAndosId
          val featureMapping: mutable.Map[String, String] = getfeatureMapForInvidualKey

          Event(productId,productTypeId,familyId,key,productName,device_os,feature) ---> This is returning **List[Event]** 
        })
      }
    else{
    Event(productId,productTypeId,familyId,key,productName,device_os,feature)  --> This is returning **Event**. THIS WORKS

    }
            
        )

}

)


我没有完全相同地实现它,但我认为理解逻辑并将其应用到您的案例中是可能的。

我创建了 json 文件,如 kafka.json 并将代码放在那里(你的事件):

[{
  "partition": 1,
  "key": "34768_20220203_MFETP501",
  "offset": 1841543,
  "createTime": 1646041475348,
  "topic": "topic_int",
  "publishTime": 1646041475344,
  "errorCode": 0,
  "userActions": {
    "productId": "3MFETP501",
    "createdDate": "2022-02-26T11:19:35.786Z",
    "events": [
      {
        "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-02-26T11:19:35.786Z",
        "familyId": 2010,
        "productTypeId": 1004678,
        "serialID": "890479804",
        "productName": "MFE Total Protection 2021 Family Pack",
        "features": {
          "mapping": [
            {
              "deviceId": 999795,
              "osId": [
                100
              ]
            },
            {
              "deviceId": 987875,
              "osId": [
                101
              ]
            }
          ]
        }
      }
    ]
  }
}]

请在下面找到第一个基于 flatMapfor 循环的解决方案。

  case class Event(
      productId: String,
      familyId: String,
      productTypeId: String,
      key: String,
      productName: String,
      deviceOS: String,
      featureMap: Map[String, String])

  import org.apache.spark.sql.{Dataset, Row, SparkSession}

  import scala.collection.mutable

  val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .master("local[*]")
      .getOrCreate()

  private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")


  import spark.implicits._

  val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
    row => {

      val userActions = row.getAs[Row]("userActions")
      val productId = userActions.getAs[String]("productId")

      val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
      for (event <- userEvents;
           familyId = event.getAs[Int]("familyId").toString;
           productTypeId = event.getAs[Int]("productTypeId").toString;
           serialId = event.getAs[String]("serialID");
           productName = event.getAs[String]("productName");
           key = s"$productId:$serialId";
           features = event.getAs[Row]("features");
           mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
           mappingRow <- mappings;
           deviceId = mappingRow.getAs[Long]("deviceId");
           osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
           osId <- osIds;
           deviseOs = deviceId + "_" + osId
           ) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> (deviseOs)))
    }

  )

  finalDataset.foreach(e => println(e))

//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))

此外,您可以使用 selectwithColumn[=15= 来解决此任务] 函数。

  case class Event(
      productId: String,
      familyId: String,
      productTypeId: String,
      key: String,
      productName: String,
      deviceOS: String,
      featureMap: Map[String, String])

  import org.apache.spark.sql.{Dataset, SparkSession}
  import org.apache.spark.sql.functions.{col, explode, concat, lit, map}

  val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .master("local[*]")
      .getOrCreate()

  private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")

  val transformedDataFrame = inputDataFrame
      .select(col("userActions.productId").as("productId"),
        explode(col("userActions.events")).as("event"))
      .select(col("productId"),
        col("event.familyId").as("familyId"),
        col("event.productTypeId").as("productTypeId"),
        col("event.serialID").as("serialID"),
        col("event.productName").as("productName"),
        explode(col("event.features.mapping")).as("features")
      )
      .select(
        col("productId"),
        col("familyId"),
        col("productTypeId"),
        col("serialID"),
        col("productName"),
        col("features.deviceId").as("deviceId"),
        explode(col("features.osId")).as("osId")
      )
      .withColumn("key", concat(col("productId"), lit(":"), col("serialID")))
      .withColumn("deviceOS", concat(col("deviceId"), lit("_"), col("osId")))
      .withColumn("featureMap", map(lit("target"), col("deviceOS")))

  import spark.implicits._

  private val result: Dataset[Event] = transformedDataFrame.as[Event]
  result.foreach(e => println(e))

//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))

添加选项以根据字段值 1 自定义响应。我在这里将 for 理解替换为 map/flatmap,因此您可以 return 作为基于类型的响应一个或多个事件.另外,我稍微自定义了 json 以在结果中显示更多示例。

新 json:

[{
  "partition": 1,
  "key": "34768_20220203_MFETP501",
  "offset": 1841543,
  "createTime": 1646041475348,
  "topic": "topic_int",
  "publishTime": 1646041475344,
  "errorCode": 0,
  "userActions": {
    "productId": "3MFETP501",
    "createdDate": "2022-02-26T11:19:35.786Z",
    "events": [
      {
        "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-02-26T11:19:35.786Z",
        "familyId": 2010,
        "productTypeId": 1004678,
        "serialID": "890479804",
        "productName": "MFE Total Protection 2021 Family Pack",
        "features": {
          "mapping": [
            {
              "deviceId": 999795,
              "osId": [
                100,
                110
              ]
            },
            {
              "deviceId": 987875,
              "osId": [
                101
              ]
            }
          ]
        }
      },
      {
        "GUID": "1111-2222-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-03-26T11:19:35.786Z",
        "familyId": 2011,
        "productTypeId": 1004679,
        "serialID": "890479805",
        "productName": "Product name",
        "features": {
          "mapping": [
            {
              "deviceId": 999796,
              "osId": [
                103
              ]
            },
            {
              "deviceId": 987877,
              "osId": [
                104
              ]
            }
          ]
        }
      }
    ]
  }
}]

请在下面找到代码:

  case class Event(
      productId: String,
      familyId: String,
      productTypeId: String,
      key: String,
      productName: String,
      deviceOS: String,
      featureMap: Map[String, String])

  import org.apache.spark.sql.{Dataset, SparkSession}

  val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .master("local[*]")
      .getOrCreate()

  private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")

  import spark.implicits._

  val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
    row => {

      val userActions = row.getAs[Row]("userActions")
      val productId = userActions.getAs[String]("productId")

      val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
      for (event <- userEvents;
           productTypeId = event.getAs[Int]("productTypeId").toString;
           serialId = event.getAs[String]("serialID");
           productName = event.getAs[String]("productName");
           key = s"$productId:$serialId";
           familyId = event.getAs[Int]("familyId").toString;
           features = event.getAs[Row]("features");
           mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
           mappingRow <- mappings;
           deviceId = mappingRow.getAs[Long]("deviceId");
           osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
           osId <- osIds;
           deviseOs = deviceId + "_" + osId
           ) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> deviseOs))

      userEvents.flatMap(event => {
        val productTypeId = event.getAs[Int]("productTypeId").toString
        val serialId = event.getAs[String]("serialID")
        val productName = event.getAs[String]("productName")
        val key = s"$productId:$serialId"
        val familyId = event.getAs[Long]("familyId")

        if(familyId == 2010) {
          val features = event.getAs[Row]("features")
          val mappings = features.getAs[mutable.WrappedArray[Row]]("mapping")
          mappings.flatMap(mappingRow => {
            val deviceId = mappingRow.getAs[Long]("deviceId")
            val osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId")
            osIds.map(osId => {
              val devise_os = deviceId + "_" + osId
              Event(productId, familyId.toString, productTypeId, key, productName, devise_os, Map("target" -> devise_os))
            })
          })
        } else {
          Seq(Event(productId, familyId.toString, productTypeId, key, productName, "default_defice_os", Map("target" -> "default_defice_os")))
        }
      })

    }
  )

  finalDataset.foreach(e => println(e))

//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_110,Map(target -> 999795_110))
//  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
//  Event(3MFETP501,2011,1004679,3MFETP501:890479805,Product name,default_defice_os,Map(target -> default_defice_os))

因为这是在一行 DataFrame 下,返回事件案例 class,转换为 DataSet.Issue 这里是针对一个条件,我得到 List[Event] 和休息类型,我只得到 Event class

仅供参考:这不是答案。但我进一步尝试解决。

if (familyId == 2010 )
    {
      val a: Option[List[String]] = flatten the deviceId,osId ..
      
      a.get.map(i=>{
          val key: String =  methodToCombinedeviceIdAndosId
          val featureMapping: mutable.Map[String, String] = getfeatureMapForInvidualKey

          Event(productId,productTypeId,familyId,key,productName,device_os,feature) ---> This is returning List[Event]
        })
      }
    else{
    Event(productId,productTypeId,familyId,key,productName,device_os,feature)  --> This is returning Event

    }