如果最近 1 小时内没有推送任何记录,如何设置 Spark Streaming 作业的警报?
How can I set alert of spark streaming job if no records are being pushed in last 1 hour?
我有一个 spark streaming 作业,它从 solace 队列读取和处理数据。如果在过去一小时内没有消耗任何数据,我想对其设置警报。目前,我已将批次 window 设置为 1 分钟。如果连续一个小时没有数据消耗,如何添加警报以便通知来源?
您有多种选择:
您可以通过将最后收到的记录的时间戳保存在 hdfs 文件中来跟踪它。然后在处理微批处理时,如果 rdd 为空并且当前时间戳和 hdfs 中的时间戳的差异超过一个小时,您可以使用邮件服务发送邮件。如果您在微批处理中收到一些记录,您可以相应地更新 hdfs 文件中的时间戳。
您的代码将如下所示,您需要实现 getTimeStampFromHDFS()
,这将在您的 hdfs 文件中实现 return 时间戳和 updateTimestampHDFS(currentTimestamp)
,当您在微批处理中收到记录时,您将在其中更新时间戳.
dstream.foreachRDD{rdd =>
if(rdd.isEmpty) {
if((System.currentTimeMillis - getTimeStampFromHDFS()) / (1000 * 60 * 60) >= 1) sendMailAlert()
}
else {
updateTimestampHDFS(System.currentTimeMillis)
}
}
我有一个 spark streaming 作业,它从 solace 队列读取和处理数据。如果在过去一小时内没有消耗任何数据,我想对其设置警报。目前,我已将批次 window 设置为 1 分钟。如果连续一个小时没有数据消耗,如何添加警报以便通知来源?
您有多种选择:
您可以通过将最后收到的记录的时间戳保存在 hdfs 文件中来跟踪它。然后在处理微批处理时,如果 rdd 为空并且当前时间戳和 hdfs 中的时间戳的差异超过一个小时,您可以使用邮件服务发送邮件。如果您在微批处理中收到一些记录,您可以相应地更新 hdfs 文件中的时间戳。
您的代码将如下所示,您需要实现 getTimeStampFromHDFS()
,这将在您的 hdfs 文件中实现 return 时间戳和 updateTimestampHDFS(currentTimestamp)
,当您在微批处理中收到记录时,您将在其中更新时间戳.
dstream.foreachRDD{rdd =>
if(rdd.isEmpty) {
if((System.currentTimeMillis - getTimeStampFromHDFS()) / (1000 * 60 * 60) >= 1) sendMailAlert()
}
else {
updateTimestampHDFS(System.currentTimeMillis)
}
}