Kafka 主题在这里,Java 消费者程序找到了它,但列出了 none 的内容,而 kafka-console-consumer 能够
The Kafka topic is here, a Java consumer program finds it, but lists none of its content, while a kafka-console-consumer is able to
这是我的第一个 Kafka 程序。
从一个 kafka_2.13-3.1.0
实例,我创建了一个 Kafka 主题 poids_garmin_brut
并用 csv
:
填充了它
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic poids_garmin_brut
kafka-console-producer.sh --broker-list localhost:9092 --topic poids_garmin_brut < "Poids(1).csv"
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
" 12 Fév. 2022",
06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %,
[...]
现在,在我要展示的程序 运行 之前或之后的任何时间,它的内容都可以通过 kafka-console-consumer
命令显示:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic poids_garmin_brut --from-beginning
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
" 12 Fév. 2022",
06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %,
" 11 Fév. 2022",
05:54,72.2 kg,0.1 kg,22.8,25.6 %,29.7 kg,3.5 kg,54.3 %,
" 10 Fév. 2022",
06:14,72.3 kg,0.0 kg,22.8,25.9 %,29.7 kg,3.5 kg,54.1 %,
" 9 Fév. 2022",
06:06,72.3 kg,0.5 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 8 Fév. 2022",
07:14,71.8 kg,0.7 kg,22.7,26.3 %,29.6 kg,3.5 kg,53.8 %,
这里是 Java 程序,基于 org.apache.kafka:kafka-streams:3.1.0
依赖,将此主题提取为流:
package extracteur.garmin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.*;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Properties;
@SpringBootApplication
public class Kafka {
/** Logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(Kafka.class);
public static void main(String[] args) {
LOGGER.info("L'extracteur de données Garmin démarre...");
/* Les données du fichier CSV d'entrée sont sous cette forme :
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
*/
// Création d'un flux sans clef et valeur : chaîne de caractères.
StreamsBuilder builder = new StreamsBuilder();
KStream<Void,String> stream = builder.stream("poids_garmin_brut");
// C'est un foreach de Kafka, pas de lambda java. Il est lazy.
stream.foreach((key, value) -> {
LOGGER.info(value);
});
KafkaStreams streams = new KafkaStreams(builder.build(), config());
streams.start();
// Fermer le flux Kafka quand la VM s'arrêtera, en faisant appeler
streams.close();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
/**
* Propriétés pour le démarrage.
* @return propriétés de configuration.
*/
private static Properties config() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return config;
}
}
但是,虽然日志在执行期间似乎没有报告任何错误,但我的程序没有进入 stream.forEach
,因此:没有显示该主题的任何内容。
(在此日志中,我删除了 [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-StreamThread-1]
的 dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-
部分,您应该在里面阅读,以了解 SO 消息的长度和易读性。并且 org.apache.kafka
变成了 o.a.k.
)。
/usr/lib/jvm/java-1.11.0-openjdk-amd64/bin/java -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true -javaagent:/opt/idea-IU-212.5284.40/lib/idea_rt.jar=41397:/opt/idea-IU-212.5284.40/bin -Dfile.encoding=UTF-8 -classpath /home/lebihan/dev/Java/garmin/target/classes:/home/lebihan/.m2/repository/org/slf4j/slf4j-api/1.7.33/slf4j-api-1.7.33.jar:/home/lebihan/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.33/log4j-over-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.6.3/spring-boot-starter-web-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter/2.6.3/spring-boot-starter-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot/2.6.3/spring-boot-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.6.3/spring-boot-autoconfigure-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.6.3/spring-boot-starter-logging-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.17.1/log4j-to-slf4j-2.17.1.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-api/2.17.1/log4j-api-2.17.1.jar:/home/lebihan/.m2/repository/org/slf4j/jul-to-slf4j/1.7.33/jul-to-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/home/lebihan/.m2/repository/org/yaml/snakeyaml/1.29/snakeyaml-1.29.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.6.3/spring-boot-starter-json-2.6.3.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.13.1/jackson-datatype-jdk8-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.13.1/jackson-datatype-jsr310-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.13.1/jackson-module-parameter-names-2.13.1.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.6.3/spring-boot-starter-tomcat-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.56/tomcat-embed-core-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.56/tomcat-embed-el-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.56/tomcat-embed-websocket-9.0.56.jar:/home/lebihan/.m2/repository/org/springframework/spring-web/5.3.15/spring-web-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-beans/5.3.15/spring-beans-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-webmvc/5.3.15/spring-webmvc-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-aop/5.3.15/spring-aop-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-context/5.3.15/spring-context-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-expression/5.3.15/spring-expression-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-core/5.3.15/spring-core-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-jcl/5.3.15/spring-jcl-5.3.15.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-streams/3.1.0/kafka-streams-3.1.0.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-clients/3.0.0/kafka-clients-3.0.0.jar:/home/lebihan/.m2/repository/com/github/luben/zstd-jni/1.5.0-2/zstd-jni-1.5.0-2.jar:/home/lebihan/.m2/repository/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar:/home/lebihan/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.1/snappy-java-1.1.8.1.jar:/home/lebihan/.m2/repository/org/rocksdb/rocksdbjni/6.22.1.1/rocksdbjni-6.22.1.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.13.1/jackson-annotations-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.13.1/jackson-databind-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.13.1/jackson-core-2.13.1.jar extracteur.garmin.Kafka
07:57:49.720 [main] INFO extracteur.garmin.Kafka - L'extracteur de données Garmin démarre...
07:57:49.747 [main] INFO o.a.k.streams.StreamsConfig - StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = dev1
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class o.a.k.streams.errors.LogAndFailExceptionHandler
default.key.serde = class o.a.k.common.serialization.Serdes$VoidSerde
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class o.a.k.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class o.a.k.streams.processor.FailOnInvalidTimestamp
default.value.serde = class o.a.k.common.serialization.Serdes$StringSerde
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = -1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
task.timeout.ms = 300000
topology.optimization = none
upgrade.from = null
window.size.ms = null
windowed.inner.class.serde = null
windowstore.changelog.additional.retention.ms = 86400000
07:57:49.760 [main] INFO o.a.k.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id = admin
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269788
07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams version: 3.1.0
07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams commit ID: 37edeed0777bacb3
07:57:49.800 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating restore consumer client
07:57:49.802 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-restore-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class o.a.k.clients.consumer.RangeAssignor, class o.a.k.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269816
07:57:49.818 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating thread producer client
07:57:49.820 [main] INFO o.a.k.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class o.a.k.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class o.a.k.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class o.a.k.common.serialization.ByteArraySerializer
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269828
07:57:49.830 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating consumer client
07:57:49.831 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = false
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = dev1
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [o.a.k.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
replication.factor = -1
windowstore.changelog.additional.retention.ms = 86400000
07:57:49.836 [main] INFO o.a.k.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [StreamThread-1-consumer] Cooperative rebalancing protocol is enabled now
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269840
07:57:49.844 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from CREATED to REBALANCING
07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Starting
07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from CREATED to STARTING
07:57:49.845 [StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Subscribed to topic(s): poids_garmin_brut
07:57:49.845 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from REBALANCING to PENDING_SHUTDOWN
07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Informed to shut down
07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
07:57:49.919 [kafka-producer-network-thread | StreamThread-1-producer] INFO o.a.k.clients.Metadata - [Producer clientId=StreamThread-1-producer] Cluster ID: QKJGs4glRAy7besZxXNCrg
07:57:49.920 [StreamThread-1] INFO o.a.k.clients.Metadata - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Cluster ID: QKJGs4glRAy7besZxXNCrg
07:57:49.921 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Discovered group coordinator debian:9092 (id: 2147483647 rack: null)
07:57:49.922 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group
07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Request joining group due to: need to re-join with the given member-id
07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group
07:57:49.930 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully joined group with generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'}
07:57:49.936 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] All members participating in this rebalance:
d1c8ce47-6fbf-41b7-b8aa-e3d094703088: [StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c].
07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.assignment.HighAvailabilityTaskAssignor - Decided on assignment: {d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 1]} with no followup probing rebalance.
07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Assigned tasks [0_0] including stateful [] to clients as:
d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([])].
07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Client d1c8ce47-6fbf-41b7-b8aa-e3d094703088 per-consumer assignment:
prev owned active {}
prev owned standby {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[]}
assigned active {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[0_0]}
revoking active {}
assigned standby {}
07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Finished stable assignment of tasks, no followup rebalances required.
07:57:49.939 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Finished assignment for group at generation 3: {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=Assignment(partitions=[poids_garmin_brut-0], userDataSize=52)}
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully synced group in generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'}
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Updating assignment with
Assigned partitions: [poids_garmin_brut-0]
Current owned partitions: []
Added partitions (assigned - owned): [poids_garmin_brut-0]
Revoked partitions (owned - assigned): []
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Notifying assignor about the new Assignment(partitions=[poids_garmin_brut-0], userDataSize=52)
07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.TaskManager - stream-thread [StreamThread-1] Handle new assignment with:
New active tasks: [0_0]
New standby tasks: []
Existing active tasks: []
Existing standby tasks: []
07:57:49.950 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Adding newly assigned partitions: poids_garmin_brut-0
07:57:49.953 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Found no committed offset for partition poids_garmin_brut-0
07:57:49.954 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Shutting down
[...]
Process finished with exit code 0
我做错了什么?
我是 运行 我的 Kafka 实例及其 Java 本地程序, 在同一台电脑上。
我体验过 3.1.0
和 2.8.1
版本的 Kafka,或者删除了 [ 的任何痕迹=73=] 在 Java 程序中没有成功。
我相信我遇到了配置问题。
以下应该有效。
LOGGER.info("L'extracteur de données Garmin démarre...");
/* Les données du fichier CSV d'entrée sont sous cette forme :
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
*/
// Création d'un flux sans clef et valeur : chaîne de caractères.
StreamsBuilder builder = new StreamsBuilder();
builder.stream("poids_garmin_brut")
.foreach((k, v) -> {
LOGGER.info(v.toString());
});
KafkaStreams streams = new KafkaStreams(builder.build(), config());
streams.start();
// Fermer le flux Kafka quand la VM s'arrêtera, en faisant appeler
//streams.close();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
输出
2022-02-15 20:05:54 INFO ConsumerCoordinator:291 - [Consumer clientId=dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1-consumer, groupId=dev1] Adding newly assigned partitions: poids_garmin_brut-0
2022-02-15 20:05:54 INFO StreamThread:229 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED
2022-02-15 20:05:54 INFO ConsumerCoordinator:844 - [Consumer clientId=dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1-consumer, groupId=dev1] Setting offset for partition poids_garmin_brut-0 to the committed offset FetchPosition{offset=21, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[LAPTOP-J1JBHQUR:9092 (id: 0 rack: null)], epoch=0}}
2022-02-15 20:05:54 INFO StreamTask:240 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] task [0_0] Initialized
2022-02-15 20:05:54 INFO StreamTask:265 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] task [0_0] Restored and ready to run
2022-02-15 20:05:54 INFO StreamThread:882 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] Restoration took 30 ms for all tasks [0_0]
2022-02-15 20:05:54 INFO StreamThread:229 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-02-15 20:05:54 INFO KafkaStreams:332 - stream-client [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b] State transition from REBALANCING to RUNNING
2022-02-15 20:05:54 INFO KafkaConsumer:2254 - [Consumer clientId=dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1-consumer, groupId=dev1] Requesting the log end offset for poids_garmin_brut-0 in order to compute lag
2022-02-15 20:06:03 INFO Main:33 - Test22
2022-02-15 20:06:06 INFO Main:33 - Test23
我害怕可耻地找到我的麻烦的原因:
streams.start();
streams.close();
streams.start()
开始监听流。
但是 steams.close()
立即结束它并在一秒钟后关闭程序!
我在评论 streams.close()
声明时看到,如果在主题中注入新消息,来自 poids_garmin_brut
主题的第一条消息需要大约 20 秒才能被检测和显示,然后其他新注入的瞬间就来了。
我相信 属性
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
本想要求 Kafka 从头开始阅读主题,但事实并非如此。
这是我的第一个 Kafka 程序。
从一个 kafka_2.13-3.1.0
实例,我创建了一个 Kafka 主题 poids_garmin_brut
并用 csv
:
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic poids_garmin_brut
kafka-console-producer.sh --broker-list localhost:9092 --topic poids_garmin_brut < "Poids(1).csv"
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
" 12 Fév. 2022",
06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %,
[...]
现在,在我要展示的程序 运行 之前或之后的任何时间,它的内容都可以通过 kafka-console-consumer
命令显示:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic poids_garmin_brut --from-beginning
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
" 12 Fév. 2022",
06:17,72.2 kg,0.0 kg,22.8,25.3 %,29.7 kg,3.6 kg,54.5 %,
" 11 Fév. 2022",
05:54,72.2 kg,0.1 kg,22.8,25.6 %,29.7 kg,3.5 kg,54.3 %,
" 10 Fév. 2022",
06:14,72.3 kg,0.0 kg,22.8,25.9 %,29.7 kg,3.5 kg,54.1 %,
" 9 Fév. 2022",
06:06,72.3 kg,0.5 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 8 Fév. 2022",
07:14,71.8 kg,0.7 kg,22.7,26.3 %,29.6 kg,3.5 kg,53.8 %,
这里是 Java 程序,基于 org.apache.kafka:kafka-streams:3.1.0
依赖,将此主题提取为流:
package extracteur.garmin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.*;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Properties;
@SpringBootApplication
public class Kafka {
/** Logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(Kafka.class);
public static void main(String[] args) {
LOGGER.info("L'extracteur de données Garmin démarre...");
/* Les données du fichier CSV d'entrée sont sous cette forme :
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
*/
// Création d'un flux sans clef et valeur : chaîne de caractères.
StreamsBuilder builder = new StreamsBuilder();
KStream<Void,String> stream = builder.stream("poids_garmin_brut");
// C'est un foreach de Kafka, pas de lambda java. Il est lazy.
stream.foreach((key, value) -> {
LOGGER.info(value);
});
KafkaStreams streams = new KafkaStreams(builder.build(), config());
streams.start();
// Fermer le flux Kafka quand la VM s'arrêtera, en faisant appeler
streams.close();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
/**
* Propriétés pour le démarrage.
* @return propriétés de configuration.
*/
private static Properties config() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return config;
}
}
但是,虽然日志在执行期间似乎没有报告任何错误,但我的程序没有进入 stream.forEach
,因此:没有显示该主题的任何内容。
(在此日志中,我删除了 [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-StreamThread-1]
的 dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088-
部分,您应该在里面阅读,以了解 SO 消息的长度和易读性。并且 org.apache.kafka
变成了 o.a.k.
)。
/usr/lib/jvm/java-1.11.0-openjdk-amd64/bin/java -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true -javaagent:/opt/idea-IU-212.5284.40/lib/idea_rt.jar=41397:/opt/idea-IU-212.5284.40/bin -Dfile.encoding=UTF-8 -classpath /home/lebihan/dev/Java/garmin/target/classes:/home/lebihan/.m2/repository/org/slf4j/slf4j-api/1.7.33/slf4j-api-1.7.33.jar:/home/lebihan/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.33/log4j-over-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar:/home/lebihan/.m2/repository/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.6.3/spring-boot-starter-web-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter/2.6.3/spring-boot-starter-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot/2.6.3/spring-boot-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.6.3/spring-boot-autoconfigure-2.6.3.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.6.3/spring-boot-starter-logging-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.17.1/log4j-to-slf4j-2.17.1.jar:/home/lebihan/.m2/repository/org/apache/logging/log4j/log4j-api/2.17.1/log4j-api-2.17.1.jar:/home/lebihan/.m2/repository/org/slf4j/jul-to-slf4j/1.7.33/jul-to-slf4j-1.7.33.jar:/home/lebihan/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/home/lebihan/.m2/repository/org/yaml/snakeyaml/1.29/snakeyaml-1.29.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.6.3/spring-boot-starter-json-2.6.3.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.13.1/jackson-datatype-jdk8-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.13.1/jackson-datatype-jsr310-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.13.1/jackson-module-parameter-names-2.13.1.jar:/home/lebihan/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.6.3/spring-boot-starter-tomcat-2.6.3.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.56/tomcat-embed-core-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.56/tomcat-embed-el-9.0.56.jar:/home/lebihan/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.56/tomcat-embed-websocket-9.0.56.jar:/home/lebihan/.m2/repository/org/springframework/spring-web/5.3.15/spring-web-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-beans/5.3.15/spring-beans-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-webmvc/5.3.15/spring-webmvc-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-aop/5.3.15/spring-aop-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-context/5.3.15/spring-context-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-expression/5.3.15/spring-expression-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-core/5.3.15/spring-core-5.3.15.jar:/home/lebihan/.m2/repository/org/springframework/spring-jcl/5.3.15/spring-jcl-5.3.15.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-streams/3.1.0/kafka-streams-3.1.0.jar:/home/lebihan/.m2/repository/org/apache/kafka/kafka-clients/3.0.0/kafka-clients-3.0.0.jar:/home/lebihan/.m2/repository/com/github/luben/zstd-jni/1.5.0-2/zstd-jni-1.5.0-2.jar:/home/lebihan/.m2/repository/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar:/home/lebihan/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.1/snappy-java-1.1.8.1.jar:/home/lebihan/.m2/repository/org/rocksdb/rocksdbjni/6.22.1.1/rocksdbjni-6.22.1.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.13.1/jackson-annotations-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.13.1/jackson-databind-2.13.1.jar:/home/lebihan/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.13.1/jackson-core-2.13.1.jar extracteur.garmin.Kafka
07:57:49.720 [main] INFO extracteur.garmin.Kafka - L'extracteur de données Garmin démarre...
07:57:49.747 [main] INFO o.a.k.streams.StreamsConfig - StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = dev1
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class o.a.k.streams.errors.LogAndFailExceptionHandler
default.key.serde = class o.a.k.common.serialization.Serdes$VoidSerde
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class o.a.k.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class o.a.k.streams.processor.FailOnInvalidTimestamp
default.value.serde = class o.a.k.common.serialization.Serdes$StringSerde
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = -1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
task.timeout.ms = 300000
topology.optimization = none
upgrade.from = null
window.size.ms = null
windowed.inner.class.serde = null
windowstore.changelog.additional.retention.ms = 86400000
07:57:49.760 [main] INFO o.a.k.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id = admin
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.790 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269788
07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams version: 3.1.0
07:57:49.793 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] Kafka Streams commit ID: 37edeed0777bacb3
07:57:49.800 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating restore consumer client
07:57:49.802 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-restore-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class o.a.k.clients.consumer.RangeAssignor, class o.a.k.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.816 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269816
07:57:49.818 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating thread producer client
07:57:49.820 [main] INFO o.a.k.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class o.a.k.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class o.a.k.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class o.a.k.common.serialization.ByteArraySerializer
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.828 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269828
07:57:49.830 [main] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Creating consumer client
07:57:49.831 [main] INFO o.a.k.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = false
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = StreamThread-1-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = dev1
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [o.a.k.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class o.a.k.common.serialization.ByteArrayDeserializer
replication.factor = -1
windowstore.changelog.additional.retention.ms = 86400000
07:57:49.836 [main] INFO o.a.k.streams.processor.internals.assignment.AssignorConfiguration - stream-thread [StreamThread-1-consumer] Cooperative rebalancing protocol is enabled now
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 3.0.0
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
07:57:49.840 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1644908269840
07:57:49.844 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from CREATED to REBALANCING
07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Starting
07:57:49.845 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from CREATED to STARTING
07:57:49.845 [StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Subscribed to topic(s): poids_garmin_brut
07:57:49.845 [main] INFO o.a.k.streams.KafkaStreams - stream-client [dev1-d1c8ce47-6fbf-41b7-b8aa-e3d094703088] State transition from REBALANCING to PENDING_SHUTDOWN
07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Informed to shut down
07:57:49.846 [kafka-streams-close-thread] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
07:57:49.919 [kafka-producer-network-thread | StreamThread-1-producer] INFO o.a.k.clients.Metadata - [Producer clientId=StreamThread-1-producer] Cluster ID: QKJGs4glRAy7besZxXNCrg
07:57:49.920 [StreamThread-1] INFO o.a.k.clients.Metadata - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Cluster ID: QKJGs4glRAy7besZxXNCrg
07:57:49.921 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Discovered group coordinator debian:9092 (id: 2147483647 rack: null)
07:57:49.922 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group
07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Request joining group due to: need to re-join with the given member-id
07:57:49.929 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] (Re-)joining group
07:57:49.930 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully joined group with generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'}
07:57:49.936 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] All members participating in this rebalance:
d1c8ce47-6fbf-41b7-b8aa-e3d094703088: [StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c].
07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.assignment.HighAvailabilityTaskAssignor - Decided on assignment: {d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 1]} with no followup probing rebalance.
07:57:49.938 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Assigned tasks [0_0] including stateful [] to clients as:
d1c8ce47-6fbf-41b7-b8aa-e3d094703088=[activeTasks: ([0_0]) standbyTasks: ([])].
07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Client d1c8ce47-6fbf-41b7-b8aa-e3d094703088 per-consumer assignment:
prev owned active {}
prev owned standby {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[]}
assigned active {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=[0_0]}
revoking active {}
assigned standby {}
07:57:49.939 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] Finished stable assignment of tasks, no followup rebalances required.
07:57:49.939 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Finished assignment for group at generation 3: {StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c=Assignment(partitions=[poids_garmin_brut-0], userDataSize=52)}
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Successfully synced group in generation Generation{generationId=3, memberId='StreamThread-1-consumer-34c0df37-baeb-4582-bdfe-79ab9e2e410c', protocol='stream'}
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Updating assignment with
Assigned partitions: [poids_garmin_brut-0]
Current owned partitions: []
Added partitions (assigned - owned): [poids_garmin_brut-0]
Revoked partitions (owned - assigned): []
07:57:49.943 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Notifying assignor about the new Assignment(partitions=[poids_garmin_brut-0], userDataSize=52)
07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamsPartitionAssignor - stream-thread [StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
07:57:49.944 [StreamThread-1] INFO o.a.k.streams.processor.internals.TaskManager - stream-thread [StreamThread-1] Handle new assignment with:
New active tasks: [0_0]
New standby tasks: []
Existing active tasks: []
Existing standby tasks: []
07:57:49.950 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Adding newly assigned partitions: poids_garmin_brut-0
07:57:49.953 [StreamThread-1] INFO o.a.k.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=StreamThread-1-consumer, groupId=dev1] Found no committed offset for partition poids_garmin_brut-0
07:57:49.954 [StreamThread-1] INFO o.a.k.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Shutting down
[...]
Process finished with exit code 0
我做错了什么?
我是 运行 我的 Kafka 实例及其 Java 本地程序, 在同一台电脑上。
我体验过
3.1.0
和2.8.1
版本的 Kafka,或者删除了 [ 的任何痕迹=73=] 在 Java 程序中没有成功。
我相信我遇到了配置问题。
以下应该有效。
LOGGER.info("L'extracteur de données Garmin démarre...");
/* Les données du fichier CSV d'entrée sont sous cette forme :
Durée,Poids,Variation,IMC,Masse grasse,Masse musculaire squelettique,Masse osseuse,Masse hydrique,
" 14 Fév. 2022",
06:37,72.1 kg,0.3 kg,22.8,26.3 %,29.7 kg,3.5 kg,53.8 %,
" 13 Fév. 2022",
06:48,72.4 kg,0.2 kg,22.9,25.4 %,29.8 kg,3.6 kg,54.4 %,
*/
// Création d'un flux sans clef et valeur : chaîne de caractères.
StreamsBuilder builder = new StreamsBuilder();
builder.stream("poids_garmin_brut")
.foreach((k, v) -> {
LOGGER.info(v.toString());
});
KafkaStreams streams = new KafkaStreams(builder.build(), config());
streams.start();
// Fermer le flux Kafka quand la VM s'arrêtera, en faisant appeler
//streams.close();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
输出
2022-02-15 20:05:54 INFO ConsumerCoordinator:291 - [Consumer clientId=dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1-consumer, groupId=dev1] Adding newly assigned partitions: poids_garmin_brut-0
2022-02-15 20:05:54 INFO StreamThread:229 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED
2022-02-15 20:05:54 INFO ConsumerCoordinator:844 - [Consumer clientId=dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1-consumer, groupId=dev1] Setting offset for partition poids_garmin_brut-0 to the committed offset FetchPosition{offset=21, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[LAPTOP-J1JBHQUR:9092 (id: 0 rack: null)], epoch=0}}
2022-02-15 20:05:54 INFO StreamTask:240 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] task [0_0] Initialized
2022-02-15 20:05:54 INFO StreamTask:265 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] task [0_0] Restored and ready to run
2022-02-15 20:05:54 INFO StreamThread:882 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] Restoration took 30 ms for all tasks [0_0]
2022-02-15 20:05:54 INFO StreamThread:229 - stream-thread [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-02-15 20:05:54 INFO KafkaStreams:332 - stream-client [dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b] State transition from REBALANCING to RUNNING
2022-02-15 20:05:54 INFO KafkaConsumer:2254 - [Consumer clientId=dev1-5e3fab76-51c7-41b5-aedf-99a4a071589b-StreamThread-1-consumer, groupId=dev1] Requesting the log end offset for poids_garmin_brut-0 in order to compute lag
2022-02-15 20:06:03 INFO Main:33 - Test22
2022-02-15 20:06:06 INFO Main:33 - Test23
我害怕可耻地找到我的麻烦的原因:
streams.start();
streams.close();
streams.start()
开始监听流。
但是 steams.close()
立即结束它并在一秒钟后关闭程序!
我在评论 streams.close()
声明时看到,如果在主题中注入新消息,来自 poids_garmin_brut
主题的第一条消息需要大约 20 秒才能被检测和显示,然后其他新注入的瞬间就来了。
我相信 属性
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
本想要求 Kafka 从头开始阅读主题,但事实并非如此。