如何将 log4j 消息转换为适合 avro 模式并将 post 转换为 kafka
How to transform a log4j message to fit an avro schema and post to kafka
我正在开发一个将所有微服务的所有日志发送到单个主题 apache kafka 的系统。大多数服务都在 python 中,但我们现在正在转发来自 Streams 应用程序的日志。所有其他服务都使用在 avro 中定义并由 confluent 的 Schema Registry 管理的相同 Schema。我可以将数据作为字符串很好地发布到 kafka,但无法弄清楚如何上传链接到架构注册表架构的有效 avro 对象。我目前正在尝试通过自定义 log4j 插件来执行此操作。出于测试目的,我将这些日志写入它们自己的主题并使用 kcat -b localhost:9092 -s value=avro -r localhost:8081 -t new_logs -f 'key: %k value: %s Partition: %p\n\n'
读取它们,但我得到
ERROR: Failed to format message in new_logs [0] at offset 0: Avro/Schema-registry message deserialization: Invalid CP1 magic byte 115, expected 0: message not produced with Schema-Registry Avro framing: terminating
执行此操作时(kcat 命令确实适用于我的实际服务日志主题和所有其他使用有效 avro 的主题)。最初我尝试使用 org.apache.avro.generic.GenericData.Record class 但无法弄清楚如何使它在 AbstractLayout 接口所需的方法 toSerializable
和 toByteArray
中工作,因为 class 没有实现可序列化 class。下面是插件,class 定义,log4j 配置
ServiceLogLayout.java
@Plugin(name="ServiceLogLayout", category= Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class ServiceLogLayout extends AbstractLayout<byte[]> {
Schema record;
DatumWriter<GenericData.Record> serviceLogDatumWriter;
public ServiceLogLayout() {
// maybe set these for avro
super(null, null, null);
Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
// CREATE SCHEMA
Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
List<Schema.Field> fields = new ArrayList<>();
fields.add(service);
fields.add(environment);
fields.add(level);
fields.add(msg);
this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
this.serviceLogDatumWriter = new GenericDatumWriter<>(this.record);
}
@Override
public byte[] toByteArray(LogEvent event) {
LOGGER.warn("toByteArray");
String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
// FILL IN RECORD
GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
schemaBuilder.set("service", "testService");
schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name().toLowerCase()));
schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
// SERIALIZE
byte[] data = new byte[0];
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder jsonEncoder = null;
try {
jsonEncoder = EncoderFactory.get().jsonEncoder(
this.record, stream);
this.serviceLogDatumWriter.write(schemaBuilder.build(), jsonEncoder);
jsonEncoder.flush();
data = stream.toByteArray();
} catch (IOException e) {
LOGGER.error("Serialization error:" + e.getMessage());
}
return data;
}
@Override
public byte[] toSerializable(LogEvent event) {
return toByteArray(event);
}
@Override
public String getContentType() {
return null;
}
@PluginFactory
public static Layout<?> createLayout() {
return new ServiceLogLayout();
}
private static class PrivateObjectOutputStream extends ObjectOutputStream {
public PrivateObjectOutputStream(final OutputStream os) throws IOException {
super(os);
}
@Override
protected void writeStreamHeader() {
// do nothing
}
}
}
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="logging.log4j.custom.plugins">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<ServiceLogLayout />
</Console>
<Kafka name="Kafka" topic="new_logs">
<ServiceLogLayout />
<Property name="bootstrap.servers">${env:BOOTSTRAP_SERVERS}</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="Kafka"/>
</Root>
<Logger name="org.apache.kafka" level="WARN"/>
</Loggers>
</Configuration>
OneCricketeer 的想法是正确的,下面是实现:
public class ServiceLogLayout extends AbstractLayout<byte[]> {
Schema record;
SchemaRegistryClient client;
Schema.Parser parser;
public ServiceLogLayout() {
// maybe set these for avro
super(null, null, null);
Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
// CREATE SCHEMA
Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
Schema.Field data = new Schema.Field("data", SchemaBuilder.builder().nullable().stringType(), "Optional extra data, such as stack frames");
Schema.Field timestamp = new Schema.Field("timestamp", SchemaBuilder.builder().type(timestampMilliType));
List<Schema.Field> fields = new ArrayList<>();
fields.add(service);
fields.add(environment);
fields.add(level);
fields.add(msg);
fields.add(data);
fields.add(timestamp);
this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
client = new CachedSchemaRegistryClient("http://schema-registry:8081", 10000);
parser = new Schema.Parser();
}
@Override
public byte[] toByteArray(LogEvent event) {
String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
// FILL IN RECORD
GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
schemaBuilder.set("service", "testService");
schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name()));
schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
schemaBuilder.set("data", null);
schemaBuilder.set("timestamp", event.getTimeMillis());
// SERIALIZE
byte[] data;
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(client);
data = kafkaAvroSerializer.serialize("service_logs", schemaBuilder.build());
return data;
}
@Override
public byte[] toSerializable(LogEvent event) {
return toByteArray(event);
}
@Override
public String getContentType() {
return null;
}
@PluginFactory
public static Layout<?> createLayout() {
return new ServiceLogLayout();
}
private static class PrivateObjectOutputStream extends ObjectOutputStream {
public PrivateObjectOutputStream(final OutputStream os) throws IOException {
super(os);
}
@Override
protected void writeStreamHeader() {
// do nothing
}
}
}
值得注意的是,使用 logstash 可能也是一个很好的解决方案
我正在开发一个将所有微服务的所有日志发送到单个主题 apache kafka 的系统。大多数服务都在 python 中,但我们现在正在转发来自 Streams 应用程序的日志。所有其他服务都使用在 avro 中定义并由 confluent 的 Schema Registry 管理的相同 Schema。我可以将数据作为字符串很好地发布到 kafka,但无法弄清楚如何上传链接到架构注册表架构的有效 avro 对象。我目前正在尝试通过自定义 log4j 插件来执行此操作。出于测试目的,我将这些日志写入它们自己的主题并使用 kcat -b localhost:9092 -s value=avro -r localhost:8081 -t new_logs -f 'key: %k value: %s Partition: %p\n\n'
读取它们,但我得到
ERROR: Failed to format message in new_logs [0] at offset 0: Avro/Schema-registry message deserialization: Invalid CP1 magic byte 115, expected 0: message not produced with Schema-Registry Avro framing: terminating
执行此操作时(kcat 命令确实适用于我的实际服务日志主题和所有其他使用有效 avro 的主题)。最初我尝试使用 org.apache.avro.generic.GenericData.Record class 但无法弄清楚如何使它在 AbstractLayout 接口所需的方法 toSerializable
和 toByteArray
中工作,因为 class 没有实现可序列化 class。下面是插件,class 定义,log4j 配置
ServiceLogLayout.java
@Plugin(name="ServiceLogLayout", category= Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class ServiceLogLayout extends AbstractLayout<byte[]> {
Schema record;
DatumWriter<GenericData.Record> serviceLogDatumWriter;
public ServiceLogLayout() {
// maybe set these for avro
super(null, null, null);
Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
// CREATE SCHEMA
Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
List<Schema.Field> fields = new ArrayList<>();
fields.add(service);
fields.add(environment);
fields.add(level);
fields.add(msg);
this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
this.serviceLogDatumWriter = new GenericDatumWriter<>(this.record);
}
@Override
public byte[] toByteArray(LogEvent event) {
LOGGER.warn("toByteArray");
String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
// FILL IN RECORD
GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
schemaBuilder.set("service", "testService");
schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name().toLowerCase()));
schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
// SERIALIZE
byte[] data = new byte[0];
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder jsonEncoder = null;
try {
jsonEncoder = EncoderFactory.get().jsonEncoder(
this.record, stream);
this.serviceLogDatumWriter.write(schemaBuilder.build(), jsonEncoder);
jsonEncoder.flush();
data = stream.toByteArray();
} catch (IOException e) {
LOGGER.error("Serialization error:" + e.getMessage());
}
return data;
}
@Override
public byte[] toSerializable(LogEvent event) {
return toByteArray(event);
}
@Override
public String getContentType() {
return null;
}
@PluginFactory
public static Layout<?> createLayout() {
return new ServiceLogLayout();
}
private static class PrivateObjectOutputStream extends ObjectOutputStream {
public PrivateObjectOutputStream(final OutputStream os) throws IOException {
super(os);
}
@Override
protected void writeStreamHeader() {
// do nothing
}
}
}
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="logging.log4j.custom.plugins">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<ServiceLogLayout />
</Console>
<Kafka name="Kafka" topic="new_logs">
<ServiceLogLayout />
<Property name="bootstrap.servers">${env:BOOTSTRAP_SERVERS}</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="Kafka"/>
</Root>
<Logger name="org.apache.kafka" level="WARN"/>
</Loggers>
</Configuration>
OneCricketeer 的想法是正确的,下面是实现:
public class ServiceLogLayout extends AbstractLayout<byte[]> {
Schema record;
SchemaRegistryClient client;
Schema.Parser parser;
public ServiceLogLayout() {
// maybe set these for avro
super(null, null, null);
Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
// CREATE SCHEMA
Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
Schema.Field data = new Schema.Field("data", SchemaBuilder.builder().nullable().stringType(), "Optional extra data, such as stack frames");
Schema.Field timestamp = new Schema.Field("timestamp", SchemaBuilder.builder().type(timestampMilliType));
List<Schema.Field> fields = new ArrayList<>();
fields.add(service);
fields.add(environment);
fields.add(level);
fields.add(msg);
fields.add(data);
fields.add(timestamp);
this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
client = new CachedSchemaRegistryClient("http://schema-registry:8081", 10000);
parser = new Schema.Parser();
}
@Override
public byte[] toByteArray(LogEvent event) {
String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
// FILL IN RECORD
GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
schemaBuilder.set("service", "testService");
schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name()));
schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
schemaBuilder.set("data", null);
schemaBuilder.set("timestamp", event.getTimeMillis());
// SERIALIZE
byte[] data;
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(client);
data = kafkaAvroSerializer.serialize("service_logs", schemaBuilder.build());
return data;
}
@Override
public byte[] toSerializable(LogEvent event) {
return toByteArray(event);
}
@Override
public String getContentType() {
return null;
}
@PluginFactory
public static Layout<?> createLayout() {
return new ServiceLogLayout();
}
private static class PrivateObjectOutputStream extends ObjectOutputStream {
public PrivateObjectOutputStream(final OutputStream os) throws IOException {
super(os);
}
@Override
protected void writeStreamHeader() {
// do nothing
}
}
}
值得注意的是,使用 logstash 可能也是一个很好的解决方案