如何将 log4j 消息转换为适合 avro 模式并将 post 转换为 kafka

How to transform a log4j message to fit an avro schema and post to kafka

我正在开发一个将所有微服务的所有日志发送到单个主题 apache kafka 的系统。大多数服务都在 python 中,但我们现在正在转发来自 Streams 应用程序的日志。所有其他服务都使用在 avro 中定义并由 confluent 的 Schema Registry 管理的相同 Schema。我可以将数据作为字符串很好地发布到 kafka,但无法弄清楚如何上传链接到架构注册表架构的有效 avro 对象。我目前正在尝试通过自定义 log4j 插件来执行此操作。出于测试目的,我将这些日志写入它们自己的主题并使用 kcat -b localhost:9092 -s value=avro -r localhost:8081 -t new_logs -f 'key: %k value: %s Partition: %p\n\n' 读取它们,但我得到

ERROR: Failed to format message in new_logs [0] at offset 0: Avro/Schema-registry message deserialization: Invalid CP1 magic byte 115, expected 0: message not produced with Schema-Registry Avro framing: terminating

执行此操作时(kcat 命令确实适用于我的实际服务日志主题和所有其他使用有效 avro 的主题)。最初我尝试使用 org.apache.avro.generic.GenericData.Record class 但无法弄清楚如何使它在 AbstractLayout 接口所需的方法 toSerializabletoByteArray 中工作,因为 class 没有实现可序列化 class。下面是插件,class 定义,log4j 配置

ServiceLogLayout.java

@Plugin(name="ServiceLogLayout", category= Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class ServiceLogLayout extends AbstractLayout<byte[]> {
    Schema record;
    DatumWriter<GenericData.Record> serviceLogDatumWriter;

    public ServiceLogLayout() {
        // maybe set these for avro
        super(null, null, null);
        Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));

        // CREATE SCHEMA
        Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
        Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
        Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
        Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");

        List<Schema.Field> fields = new ArrayList<>();
        fields.add(service);
        fields.add(environment);
        fields.add(level);
        fields.add(msg);
        this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
        this.serviceLogDatumWriter = new GenericDatumWriter<>(this.record);
    }

    @Override
    public byte[] toByteArray(LogEvent event) {
        LOGGER.warn("toByteArray");

        String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
        // FILL IN RECORD
        GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
        schemaBuilder.set("service", "testService");
        schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
        schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name().toLowerCase()));
        schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
        
        // SERIALIZE
        byte[] data = new byte[0];
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Encoder jsonEncoder = null;
        try {
            jsonEncoder = EncoderFactory.get().jsonEncoder(
                    this.record, stream);
            this.serviceLogDatumWriter.write(schemaBuilder.build(), jsonEncoder);
            jsonEncoder.flush();
            data = stream.toByteArray();
        } catch (IOException e) {
            LOGGER.error("Serialization error:" + e.getMessage());
        }
        return data;
    }

    @Override
    public byte[] toSerializable(LogEvent event) {
        return toByteArray(event);
    }

    @Override
    public String getContentType() {
        return null;
    }

    @PluginFactory
    public static Layout<?> createLayout() {
        return new ServiceLogLayout();
    }

    private static class PrivateObjectOutputStream extends ObjectOutputStream {

        public PrivateObjectOutputStream(final OutputStream os) throws IOException {
            super(os);
        }

        @Override
        protected void writeStreamHeader() {
            // do nothing
        }
    }

}

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="logging.log4j.custom.plugins">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <ServiceLogLayout />

        </Console>
        <Kafka name="Kafka" topic="new_logs">
            <ServiceLogLayout />
            <Property name="bootstrap.servers">${env:BOOTSTRAP_SERVERS}</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="Kafka"/>
        </Root>
        <Logger name="org.apache.kafka" level="WARN"/>
    </Loggers>
</Configuration>

OneCricketeer 的想法是正确的,下面是实现:

public class ServiceLogLayout extends AbstractLayout<byte[]> {
    Schema record;
    SchemaRegistryClient client;
    Schema.Parser parser;

    public ServiceLogLayout() {
        // maybe set these for avro
        super(null, null, null);
        Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));

        // CREATE SCHEMA
        Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
        Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
        Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
        Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
        Schema.Field data = new Schema.Field("data", SchemaBuilder.builder().nullable().stringType(), "Optional extra data, such as stack frames");
        Schema.Field timestamp = new Schema.Field("timestamp", SchemaBuilder.builder().type(timestampMilliType));

        List<Schema.Field> fields = new ArrayList<>();
        fields.add(service);
        fields.add(environment);
        fields.add(level);
        fields.add(msg);
        fields.add(data);
        fields.add(timestamp);
        this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);

        client = new CachedSchemaRegistryClient("http://schema-registry:8081", 10000);
        parser = new Schema.Parser();
    }

    @Override
    public byte[] toByteArray(LogEvent event) {

        String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
        // FILL IN RECORD
        GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
        schemaBuilder.set("service", "testService");
        schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
        schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name()));
        schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
        schemaBuilder.set("data", null);
        schemaBuilder.set("timestamp", event.getTimeMillis());

        // SERIALIZE
        byte[] data;
        KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(client);
        data = kafkaAvroSerializer.serialize("service_logs", schemaBuilder.build());

        return data;
    }

    @Override
    public byte[] toSerializable(LogEvent event) {
        return toByteArray(event);
    }

    @Override
    public String getContentType() {
        return null;
    }

    @PluginFactory
    public static Layout<?> createLayout() {
        return new ServiceLogLayout();
    }

    private static class PrivateObjectOutputStream extends ObjectOutputStream {

        public PrivateObjectOutputStream(final OutputStream os) throws IOException {
            super(os);
        }

        @Override
        protected void writeStreamHeader() {
            // do nothing
        }
    }

}

值得注意的是,使用 logstash 可能也是一个很好的解决方案