我的自定义 flink 指标报告器不工作
my custom flink metrics reporter don't work
使用 flink 版本 1.13.1
我写了一个自定义指标报告器,但它似乎在我的 flink 中不起作用。
当我启动 flink 时,JobManager 会像这样显示警告日志:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, org.apache.flink.metrics.graphite.GraphiteReporterFactory, org.apache.flink.metrics.statsd.StatsDReporterFactory, org.apache.flink.metrics.prometheus.PrometheusReporterFactory, org.apache.flink.metrics.jmx.JMXReporterFactory, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
2021-08-25 14:54:06,245 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl [] - No metrics reporter configured, no metrics will be exposed/reported.
但我已经在 plugins 文件夹中创建了名为 'metrics-kafka' 的文件夹,然后将 metrics reporter 打包并将 jar 文件复制到 'metrics-kafka' 或 lib 文件夹(这两个文件夹都不起作用)
我的 flink 配置文件:
metrics.reporter.kafka.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka.class: org.apache.flink.metrics.kafka.KafkaReporter
metrics.reporter.kafka.interval: 15 SECONDS
我的指标记者工厂 class:
package org.apache.flink.metrics.kafka
import org.apache.flink.metrics.reporter.{InterceptInstantiationViaReflection, MetricReporter, MetricReporterFactory}
import java.util.Properties
@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
class KafkaReporterFactory extends MetricReporterFactory{
override def createMetricReporter(properties: Properties): MetricReporter = {
new KafkaReporter()
}
}
和我的记者class:
package org.apache.flink.metrics.kafka
import org.apache.flink.metrics.MetricConfig
import org.apache.flink.metrics.reporter.{InstantiateViaFactory, Scheduled}
@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.kafka.KafkaReporterFactory")
class KafkaReporter extends MyAbstractReporter with Scheduled{
some code ...
}
我发现需要在名为 'org.apache.flink.metrics.reporter.MetricReporterFactory' 的“/resources/META-INF/services/”中添加文件,并在此文件中写入工厂 class 路径,如 'org.apache.flink.metrics.kafka.KafkaReporterFactory'
使用 flink 版本 1.13.1
我写了一个自定义指标报告器,但它似乎在我的 flink 中不起作用。 当我启动 flink 时,JobManager 会像这样显示警告日志:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, org.apache.flink.metrics.graphite.GraphiteReporterFactory, org.apache.flink.metrics.statsd.StatsDReporterFactory, org.apache.flink.metrics.prometheus.PrometheusReporterFactory, org.apache.flink.metrics.jmx.JMXReporterFactory, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
2021-08-25 14:54:06,245 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl [] - No metrics reporter configured, no metrics will be exposed/reported.
但我已经在 plugins 文件夹中创建了名为 'metrics-kafka' 的文件夹,然后将 metrics reporter 打包并将 jar 文件复制到 'metrics-kafka' 或 lib 文件夹(这两个文件夹都不起作用)
我的 flink 配置文件:
metrics.reporter.kafka.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka.class: org.apache.flink.metrics.kafka.KafkaReporter
metrics.reporter.kafka.interval: 15 SECONDS
我的指标记者工厂 class:
package org.apache.flink.metrics.kafka
import org.apache.flink.metrics.reporter.{InterceptInstantiationViaReflection, MetricReporter, MetricReporterFactory}
import java.util.Properties
@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
class KafkaReporterFactory extends MetricReporterFactory{
override def createMetricReporter(properties: Properties): MetricReporter = {
new KafkaReporter()
}
}
和我的记者class:
package org.apache.flink.metrics.kafka
import org.apache.flink.metrics.MetricConfig
import org.apache.flink.metrics.reporter.{InstantiateViaFactory, Scheduled}
@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.kafka.KafkaReporterFactory")
class KafkaReporter extends MyAbstractReporter with Scheduled{
some code ...
}
我发现需要在名为 'org.apache.flink.metrics.reporter.MetricReporterFactory' 的“/resources/META-INF/services/”中添加文件,并在此文件中写入工厂 class 路径,如 'org.apache.flink.metrics.kafka.KafkaReporterFactory'