使用 apache spark streaming 进行实时日志处理

real time log processing using apache spark streaming

我想创建一个可以实时读取日志的系统,并使用apache spark 进行处理。我很困惑是应该使用 kafka 或 flume 之类的东西将日志传递给 spark stream,还是应该使用套接字传递日志。我已经完成了 spark 流文档中的示例程序- Spark stream example。但如果有人能指导我更好地传递日志到 spark stream,我将不胜感激。这对我来说是一块新地盘。

您可以使用 Apache Kafka 作为日志队列系统。生成日志的系统,例如 websever 会将日志发送到 Apache KAFKA。然后你可以使用apache storm或者spark streaming库实时读取KAFKA主题和进程日志。

您需要创建日志流,您可以使用 Apache Kakfa 创建。 kafka 可以与 storm 和 apache spark 集成。两者各有利弊。

对于 Storm Kafka 集成,请查看 here

A​​pache Spark Kafka Integration 看看here

Apache Flume 可能有助于实时读取日志。 Flume 为使用 Spark Streaming 分析所需信息的应用程序提供日志收集和传输。

1.official site or follow the instructions from here

下载 Apache Flume

2. 设置和 运行 Flume 从安装Flume的目录(FLUME_INSTALLATION_PATH\conf)修改flume-conf.properties.template,这里需要提供logs source,channel和sinks(output)。有关设置的更多详细信息 here

有一个启动 flume 的示例,它从 windows 主机上的 ping 命令 运行ning 收集日志信息并将其写入文件:

flume-conf.properties

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.seqGenSrc.type = exec
agent.sources.seqGenSrc.shell = powershell -Command

agent.sources.seqGenSrc.command = for() { ping google.com }

agent.sources.seqGenSrc.channels = memoryChannel

agent.sinks.loggerSink.type = file_roll

agent.sinks.loggerSink.channel = memoryChannel
agent.sinks.loggerSink.sink.directory = D:\TMP\flu\
agent.sinks.loggerSink.serializer = text
agent.sinks.loggerSink.appendNewline = false
agent.sinks.loggerSink.rollInterval = 0

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100

到运行例子转到FLUME_INSTALLATION_PATH并执行

java -Xmx20m -Dlog4j.configuration=file:///%CD%\conf\log4j.properties -cp .\lib\* org.apache.flume.node.Application -f conf\flume-conf.properties -n agent

或者您可以创建在类路径中具有 flume 库的 java 应用程序,并从传递相应参数的应用程序中调用 org.apache.flume.node.Application 实例。

如何设置Flume收集和传输日志?

您可以使用一些脚本从指定位置收集日志

agent.sources.seqGenSrc.shell = powershell -Command
agent.sources.seqGenSrc.command = your script here

除了 windows 脚本,您还可以启动 java 应用程序(在字段中输入 'java path_to_main_class arguments'),它提供智能日志收集。例如,如果实时修改文件,您可以使用 Apache Commons IO 中的 Tailer。 要配置 Flume 以传输日志信息,请阅读此 article

3. 从您的源代码中获取 Flume 流并使用 Spark 对其进行分析。 查看 github https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java

中的代码示例

虽然这是一个老问题,但从 Databricks 发布了一个 link,其中有一篇很棒的分步文章,用于考虑许多领域的 Spark 日志分析。

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/index.html

希望对您有所帮助。