spark groupBy 操作挂在 199/200
spark groupBy operation hangs at 199/200
我有一个带有主节点和两个执行器的 spark 独立集群。我有一个 RDD[LevelOneOutput]
下面是 LevelOneOutput
class
class LevelOneOutput extends Serializable {
@BeanProperty
var userId: String = _
@BeanProperty
var tenantId: String = _
@BeanProperty
var rowCreatedMonth: Int = _
@BeanProperty
var rowCreatedYear: Int = _
@BeanProperty
var listType1: ArrayBuffer[TypeOne] = _
@BeanProperty
var listType2: ArrayBuffer[TypeTwo] = _
@BeanProperty
var listType3: ArrayBuffer[TypeThree] = _
...
...
@BeanProperty
var listType18: ArrayBuffer[TypeEighteen] = _
@BeanProperty
var groupbyKey: String = _
}
现在我想根据 userId、tenantId、rowCreatedMonth、rowCreatedYear 对这个 RDD 进行分组。为此我做了这个
val levelOneRDD = inputRDD.map(row => {
row.setGroupbyKey(s"${row.getTenantId}_${row.getRowCreatedYear}_${row.getRowCreatedMonth}_${row.getUserId}")
row
})
val groupedRDD = levelOneRDD.groupBy(row => row.getGroupbyKey)
这给了我键中的数据 String
和值 Iterable[LevelOneOutput]
现在我想为该组密钥生成一个 LevelOneOutput
对象。为此,我正在做类似下面的事情:
val rdd = groupedRDD.map(row => {
val levelOneOutput = new LevelOneOutput
val groupKey = row._1.split("_")
levelOneOutput.setTenantId(groupKey(0))
levelOneOutput.setRowCreatedYear(groupKey(1).toInt)
levelOneOutput.setRowCreatedMonth(groupKey(2).toInt)
levelOneOutput.setUserId(groupKey(3))
var listType1 = new ArrayBuffer[TypeOne]
var listType2 = new ArrayBuffer[TypeTwo]
var listType3 = new ArrayBuffer[TypeThree]
...
...
var listType18 = new ArrayBuffer[TypeEighteen]
row._2.foreach(data => {
if (data.getListType1 != null) listType1 = listType1 ++ data.getListType1
if (data.getListType2 != null) listType2 = listType2 ++ data.getListType2
if (data.getListType3 != null) listType3 = listType3 ++ data.getListType3
...
...
if (data.getListType18 != null) listType18 = listType18 ++ data.getListType18
})
if (listType1.isEmpty) levelOneOutput.setListType1(null) else levelOneOutput.setListType1(listType1)
if (listType2.isEmpty) levelOneOutput.setListType2(null) else levelOneOutput.setListType2(listType2)
if (listType3.isEmpty) levelOneOutput.setListType3(null) else levelOneOutput.setListType3(listType3)
...
...
if (listType18.isEmpty) levelOneOutput.setListType18(null) else levelOneOutput.setListType18(listType18)
levelOneOutput
})
对于较小的输入,这按预期工作,但是当我尝试 运行 较大的输入数据集时,按操作分组在 199/200 时挂起,我没有看到stdout/stderr
中的任何特定错误或警告
有人能告诉我为什么这项工作没有进一步进行吗...
我没有使用 groupBy
操作,而是创建了如下所示的配对 RDD
val levelOnePairedRDD = inputRDD.map(row => {
row.setGroupbyKey(s"${row.getTenantId}_${row.getRowCreatedYear}_${row.getRowCreatedMonth}_${row.getUserId}")
(row.getGroupByKey, row)
})
并更新了处理逻辑,解决了我的问题。
我有一个带有主节点和两个执行器的 spark 独立集群。我有一个 RDD[LevelOneOutput]
下面是 LevelOneOutput
class
class LevelOneOutput extends Serializable {
@BeanProperty
var userId: String = _
@BeanProperty
var tenantId: String = _
@BeanProperty
var rowCreatedMonth: Int = _
@BeanProperty
var rowCreatedYear: Int = _
@BeanProperty
var listType1: ArrayBuffer[TypeOne] = _
@BeanProperty
var listType2: ArrayBuffer[TypeTwo] = _
@BeanProperty
var listType3: ArrayBuffer[TypeThree] = _
...
...
@BeanProperty
var listType18: ArrayBuffer[TypeEighteen] = _
@BeanProperty
var groupbyKey: String = _
}
现在我想根据 userId、tenantId、rowCreatedMonth、rowCreatedYear 对这个 RDD 进行分组。为此我做了这个
val levelOneRDD = inputRDD.map(row => {
row.setGroupbyKey(s"${row.getTenantId}_${row.getRowCreatedYear}_${row.getRowCreatedMonth}_${row.getUserId}")
row
})
val groupedRDD = levelOneRDD.groupBy(row => row.getGroupbyKey)
这给了我键中的数据 String
和值 Iterable[LevelOneOutput]
现在我想为该组密钥生成一个 LevelOneOutput
对象。为此,我正在做类似下面的事情:
val rdd = groupedRDD.map(row => {
val levelOneOutput = new LevelOneOutput
val groupKey = row._1.split("_")
levelOneOutput.setTenantId(groupKey(0))
levelOneOutput.setRowCreatedYear(groupKey(1).toInt)
levelOneOutput.setRowCreatedMonth(groupKey(2).toInt)
levelOneOutput.setUserId(groupKey(3))
var listType1 = new ArrayBuffer[TypeOne]
var listType2 = new ArrayBuffer[TypeTwo]
var listType3 = new ArrayBuffer[TypeThree]
...
...
var listType18 = new ArrayBuffer[TypeEighteen]
row._2.foreach(data => {
if (data.getListType1 != null) listType1 = listType1 ++ data.getListType1
if (data.getListType2 != null) listType2 = listType2 ++ data.getListType2
if (data.getListType3 != null) listType3 = listType3 ++ data.getListType3
...
...
if (data.getListType18 != null) listType18 = listType18 ++ data.getListType18
})
if (listType1.isEmpty) levelOneOutput.setListType1(null) else levelOneOutput.setListType1(listType1)
if (listType2.isEmpty) levelOneOutput.setListType2(null) else levelOneOutput.setListType2(listType2)
if (listType3.isEmpty) levelOneOutput.setListType3(null) else levelOneOutput.setListType3(listType3)
...
...
if (listType18.isEmpty) levelOneOutput.setListType18(null) else levelOneOutput.setListType18(listType18)
levelOneOutput
})
对于较小的输入,这按预期工作,但是当我尝试 运行 较大的输入数据集时,按操作分组在 199/200 时挂起,我没有看到stdout/stderr
中的任何特定错误或警告有人能告诉我为什么这项工作没有进一步进行吗...
我没有使用 groupBy
操作,而是创建了如下所示的配对 RDD
val levelOnePairedRDD = inputRDD.map(row => {
row.setGroupbyKey(s"${row.getTenantId}_${row.getRowCreatedYear}_${row.getRowCreatedMonth}_${row.getUserId}")
(row.getGroupByKey, row)
})
并更新了处理逻辑,解决了我的问题。