Apache 中的 Apache Avro 模式验证 Flume

Apache Avro schema validation in Apache Flume

阅读 Apache Flume 及其在处理客户端事件方面的优势后,我决定是时候开始更详细地研究它了。另一个巨大的好处似乎是它可以处理 Apache Avro 对象 :-) 但是,我很难理解 Avro 模式如何用于验证收到的 Flume 个事件。

为了帮助更详细地了解我的问题,我在下面提供了代码片段;

Avro 架构

出于此 post 的目的,我使用了一个示例架构,它定义了一个包含 2 个字段的嵌套 Object1 记录。

{
  "namespace": "com.example.avro",
  "name": "Example",
  "type": "record",
  "fields": [
    {
      "name": "object1",
      "type": {
        "name": "Object1",
        "type": "record",
        "fields": [
          {
            "name": "value1",
            "type": "string"
          },
          {
            "name": "value2",
            "type": "string"
          }
        ]
      }
    }
  ]
}

嵌入式Flume代理

在我的 Java 项目中,我目前正在使用 Apache Flume 嵌入式代理,详情如下;

public static void main(String[] args) {
    final Event event = EventBuilder.withBody("Test", Charset.forName("UTF-8"));

    final Map<String, String> properties = new HashMap<>();
    properties.put("channel.type", "memory");
    properties.put("channel.capacity", "100");
    properties.put("sinks", "sink1");
    properties.put("sink1.type", "avro");
    properties.put("sink1.hostname", "192.168.99.101");
    properties.put("sink1.port", "11111");
    properties.put("sink1.batch-size", "1");
    properties.put("processor.type", "failover");

    final EmbeddedAgent embeddedAgent = new EmbeddedAgent("TestAgent");
    embeddedAgent.configure(properties);
    embeddedAgent.start();

    try {
        embeddedAgent.put(event);
    } catch (EventDeliveryException e) {
        e.printStackTrace();
    }
}

在上面的示例中,我正在创建一个新的 Flume 事件,其中“测试”定义为事件主体,将事件发送到虚拟机内的单独 Apache Flume 代理 运行( 192.168.99.101).

远程Flume代理

如上所述,我已将此代理配置为从嵌入式 Flume 代理接收事件。此代理的 Flume 配置如下所示;

# Name the components on this agent
hello.sources = avroSource
hello.channels = memoryChannel
hello.sinks = loggerSink

# Describe/configure the source
hello.sources.avroSource.type = avro
hello.sources.avroSource.bind = 0.0.0.0
hello.sources.avroSource.port = 11111
hello.sources.avroSource.channels = memoryChannel

# Describe the sink
hello.sinks.loggerSink.type = logger

# Use a channel which buffers events in memory
hello.channels.memoryChannel.type = memory
hello.channels.memoryChannel.capacity = 1000
hello.channels.memoryChannel.transactionCapacity = 1000

# Bind the source and sink to the channel
hello.sources.avroSource.channels = memoryChannel
hello.sinks.loggerSink.channel = memoryChannel

我正在执行以下命令来启动代理;

./bin/flume-ng agent --conf conf --conf-file ../sample-flume.conf --name hello -Dflume.root.logger=TRACE,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

当我执行 Java 项目主要方法时,我看到“测试”事件通过以下输出传递到我的记录器接收器;

2019-02-18 14:15:09,998 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 54 65 73 74                                     Test }

但是,我不清楚我应该在哪里配置 Avro 架构以确保 Flume 只接收和处理有效事件。有人可以帮我了解我哪里出错了吗?或者,如果我误解了 Flume 旨在将 Flume 事件转换为 Avro 事件的意图?

除上述之外,我还尝试在更改 Avro 模式后使用 Avro RPC 客户端指定一个直接与我的远程 Flume 代理对话的协议,但是当我尝试发送事件时,我看到以下错误;

Exception in thread "main" org.apache.avro.AvroRuntimeException: Not a remote message: test
    at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:532)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:613)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:595)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我的目标是我能够确保我的应用程序填充的事件符合为避免发布无效事件而生成的 Avro 架构。我更愿意使用嵌入式 Flume 代理来实现这一点,但如果这不可能,那么我会考虑使用 Avro RPC 方法直接与我的远程 Flume 代理对话。

任何帮助/指导都会有很大帮助。提前致谢。

更新

进一步阅读后,我想知道我是否误解了 Apache Flume 的目的。我最初认为这可以用于根据数据/模式自动创建 Avro 事件,但现在想知道应用程序是否应该承担产生 Avro 事件的责任,这些事件将根据通道配置存储在 Flume 中并作为通过接收器进行批处理(在我的例子中是 Spark Streaming 集群)。

如果以上是正确的,那么我想知道 Flume 是否需要了解架构,或者只是我的 Spark Streaming 集群最终会处理这些数据?如果 Flume 需要了解架构,那么您能否详细说明如何实现这一点?

提前致谢。

由于您的目标是使用 Spark Streaming 集群处理数据,您可以使用 2 种解决方案解决此问题

1) 使用 Flume 客户端(使用 flume-ng-sdk 1.9.0 测试)和 Spark Streaming(使用 spark-streaming_2.11 2.4.0 和 [=31 测试=].11 2.3.0) 网络拓扑之间没有 Flume 服务器。

客户端 class 在端口 41416

发送 Flume json 事件
  public class JSONFlumeClient {
    public static void main(String[] args) {
    RpcClient client = RpcClientFactory.getDefaultInstance("localhost", 41416);
    String jsonData = "{\r\n" + "  \"namespace\": \"com.example.avro\",\r\n" + "  \"name\": \"Example\",\r\n"
            + "  \"type\": \"record\",\r\n" + "  \"fields\": [\r\n" + "    {\r\n"
            + "      \"name\": \"object1\",\r\n" + "      \"type\": {\r\n" + "        \"name\": \"Object1\",\r\n"
            + "        \"type\": \"record\",\r\n" + "        \"fields\": [\r\n" + "          {\r\n"
            + "            \"name\": \"value1\",\r\n" + "            \"type\": \"string\"\r\n" + "          },\r\n"
            + "          {\r\n" + "            \"name\": \"value2\",\r\n" + "            \"type\": \"string\"\r\n"
            + "          }\r\n" + "        ]\r\n" + "      }\r\n" + "    }\r\n" + "  ]\r\n" + "}";
    Event event = EventBuilder.withBody(jsonData, Charset.forName("UTF-8"));
    try {
        client.append(event);
    } catch (Throwable t) {
        System.err.println(t.getMessage());
        t.printStackTrace();
    } finally {
        client.close();
    }
  }
}

Spark Streaming Server class 侦听端口 41416

public class SparkStreamingToySample {
  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setMaster("local[2]")
    .setAppName("SparkStreamingToySample");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
    JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils
    .createStream(ssc, "localhost", 41416);
    lines.map(sfe -> new String(sfe.event().getBody().array(), "UTF-8"))
    .foreachRDD((data,time)->
    System.out.println("***" + new Date(time.milliseconds()) + "=" + data.collect().toString()));
    ssc.start();
    ssc.awaitTermination();
  }
}

2) 使用Flume客户端+Flume服务器+Spark Streaming(作为FlumeSink)作为网络拓扑。

对于此选项,代码是相同的,但现在 SparkStreaming 必须指定完整的 dns 限定主机名而不是 localhost 才能在同一端口 41416 启动 SparkStreaming 服务器,如果您 运行 在本地进行测试. Flume 客户端将连接到 flume 服务器端口 41415。现在棘手的部分是如何定义 flume 拓扑。您需要同时指定源和接收器才能使其正常工作。

请参阅下面的 flume 配置文件

agent1.channels.ch1.type = memory

agent1.sources.avroSource1.channels = ch1
agent1.sources.avroSource1.type = avro
agent1.sources.avroSource1.bind = 0.0.0.0
agent1.sources.avroSource1.port = 41415

agent1.sinks.avroSink.channel = ch1
agent1.sinks.avroSink.type = avro
agent1.sinks.avroSink.hostname = <full dns qualified hostname>
agent1.sinks.avroSink.port = 41416

agent1.channels = ch1
agent1.sources = avroSource1
agent1.sinks = avroSink

你应该用这两种解决方案得到相同的结果,但是回到你的问题,即 Json 流中的 Spark Streaming 内容是否真的需要 Flume,答案取决于,Flume 支持拦截器,因此在这种情况下,它可用于清理或过滤 Spark 项目的无效数据,但由于您要向拓扑添加额外的组件,它可能会影响性能并需要更多资源 (CPU/Memory)比没有 Flume.