无法覆盖lagom kafka参数
Unable to override lagom kafka parameters
我创建了一个普通的 java 项目并将 lagom kafka 客户端的所有依赖项放在 classpath 上,然后在源文件夹中我将 application.conf
application.conf
的内容
lagom.broker.kafka {
service-name = ""
brokers = "127.0.0.1:9092"
}
虽然 运行 应用程序 service-name = "" 应该被使用(这样我的代理路径就可以被使用,而不是发现),但是它没有工作
调试时我发现在 KafkaConfig class service-name 结果是 "kafka_native"。
我发现在创建 KafkaConfig 时,即将到来的 conf 对象在其来源中没有我的 application.conf
在此之后我尝试使用 vm 参数覆盖它们,如下所示:
-Dlagom.broker.kafka.service-name=""
-Dlagom.broker.kafka.brokers="127.0.0.1:9092"
-Dakka.kafka.consumer.kafka-clients.auto.offset.reset="earliest"
成功了。
有人可以解释为什么在应用程序 conf 中覆盖不起作用
这就是我订阅主题的方式
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import com.ameyo.ticketing.ticket.api.TicketingService;
import com.ameyo.ticketing.ticket.api.events.TicketEvent;
import com.lightbend.lagom.javadsl.api.broker.Topic;
import com.lightbend.lagom.javadsl.client.integration.LagomClientFactory;
import com.typesafe.config.ConfigFactory;
import akka.Done;
import akka.stream.javadsl.Flow;
/**
*
*/
public class Main {
public static void main(String[] args) {
String brokers = ConfigFactory.load().getString("lagom.broker.kafka.brokers");
System.out.println("Initial Value for Brokers " + brokers);
LagomClientFactory clientFactory = LagomClientFactory.create("legacy-system", Main.class.getClassLoader());
TicketingService ticketTingService = clientFactory.createClient(TicketingService.class,
URI.create("http://localhost:11000"));
Topic<TicketEvent> ticketEvents = ticketTingService.ticketEvents();
ticketEvents.subscribe().withGroupId("nya13").atLeastOnce(Flow.<TicketEvent> create().mapAsync(1, e -> {
System.out.println("kuch to aaya");
return CompletableFuture.completedFuture(Done.getInstance());
}));
try {
Thread.sleep(1000000000);
} catch (InterruptedException e1) {
}
}
}
将配置更改为
akka{
lagom.broker.kafka {
service-name = ""
brokers = "127.0.0.1:9092"
}
}
成功了
我创建了一个普通的 java 项目并将 lagom kafka 客户端的所有依赖项放在 classpath 上,然后在源文件夹中我将 application.conf
application.conf
的内容lagom.broker.kafka {
service-name = ""
brokers = "127.0.0.1:9092"
}
虽然 运行 应用程序 service-name = "" 应该被使用(这样我的代理路径就可以被使用,而不是发现),但是它没有工作
调试时我发现在 KafkaConfig class service-name 结果是 "kafka_native"。
我发现在创建 KafkaConfig 时,即将到来的 conf 对象在其来源中没有我的 application.conf
在此之后我尝试使用 vm 参数覆盖它们,如下所示:
-Dlagom.broker.kafka.service-name=""
-Dlagom.broker.kafka.brokers="127.0.0.1:9092"
-Dakka.kafka.consumer.kafka-clients.auto.offset.reset="earliest"
成功了。
有人可以解释为什么在应用程序 conf 中覆盖不起作用
这就是我订阅主题的方式
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import com.ameyo.ticketing.ticket.api.TicketingService;
import com.ameyo.ticketing.ticket.api.events.TicketEvent;
import com.lightbend.lagom.javadsl.api.broker.Topic;
import com.lightbend.lagom.javadsl.client.integration.LagomClientFactory;
import com.typesafe.config.ConfigFactory;
import akka.Done;
import akka.stream.javadsl.Flow;
/**
*
*/
public class Main {
public static void main(String[] args) {
String brokers = ConfigFactory.load().getString("lagom.broker.kafka.brokers");
System.out.println("Initial Value for Brokers " + brokers);
LagomClientFactory clientFactory = LagomClientFactory.create("legacy-system", Main.class.getClassLoader());
TicketingService ticketTingService = clientFactory.createClient(TicketingService.class,
URI.create("http://localhost:11000"));
Topic<TicketEvent> ticketEvents = ticketTingService.ticketEvents();
ticketEvents.subscribe().withGroupId("nya13").atLeastOnce(Flow.<TicketEvent> create().mapAsync(1, e -> {
System.out.println("kuch to aaya");
return CompletableFuture.completedFuture(Done.getInstance());
}));
try {
Thread.sleep(1000000000);
} catch (InterruptedException e1) {
}
}
}
将配置更改为
akka{
lagom.broker.kafka {
service-name = ""
brokers = "127.0.0.1:9092"
}
}
成功了