嵌入式 Kafka 以错误的分区数开始
Embedded Kafka starting with wrong number of partitions
我在 JUnit 测试中启动了一个 EmbeddedKafka 实例。我可以在应用程序中正确读取已推送到流的记录,但我注意到的一件事是每个主题只有一个分区。谁能解释一下为什么?
在我的应用程序中,我有以下内容:
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
此 returns 包含一项的列表。当 运行 针对具有 3 个分区的本地 Kafka 时,它 returns 一个包含 3 个项目的列表如预期的那样。
我的测试如下:
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 3)
@ActiveProfiles("inmemory")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@TestPropertySource(
locations = "classpath:application-test.properties",
properties = {"app.onlyMonitorIfDataUpdated=true"})
public class MonitorRestKafkaIntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${spring.embedded.kafka.brokers}")
private String embeddedBrokers;
@Autowired
private WebApplicationContext wac;
@Autowired
private JsonUtility jsonUtility;
private MockMvc mockMvc;
@Before
public void setup() {
mockMvc = webAppContextSetup(wac).build();
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("dummyUser"));
}
private ResultActions interactiveMonitoringREST(String eggID, String monitoringParams) throws Exception {
return mockMvc.perform(post(String.format("/eggs/%s/interactive", eggID)).contentType(MediaType.APPLICATION_JSON_VALUE).content(monitoringParams));
}
@Test
@WithMockUser("super_user")
public void testEmbeddedKafka() throws Exception {
Producer<String, String> producer = getKafkaProducer();
sendRecords(producer, 3);
updateConn();
interactiveMonitoringREST(EGG_KAFKA, monitoringParams)
.andExpect(status().isOk())
.andDo(print())
.andExpect(jsonPath("$.taskResults[0].resultDetails.numberOfRecordsProcessed").value(3))
.andExpect(jsonPath("$.taskResults[0].resultDetails.numberOfRecordsSkipped").value(0));
}
private void sendRecords(Producer<String, String> producer, int records) {
for (int i = 0; i < records; i++) {
String val = "{\"auto_age\":" + String.valueOf(i + 10) + "}";
producer.send(new ProducerRecord<>(testTopic, String.valueOf(i), val));
}
producer.flush();
}
private Producer<String, String> getKafkaProducer() {
Map<String, Object> prodConfigs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<>(prodConfigs, new StringSerializer(), new StringSerializer()).createProducer();
}
private void updateConn() throws Exception {
String conn = getConnectionREST(CONN_KAFKA).andReturn().getResponse().getContentAsString();
ConnectionDetail connectionDetail = jsonUtility.fromJson(conn, ConnectionDetail.class);
connectionDetail.getDetails().put(ConnectionDetailConstants.CONNECTION_SERVER, embeddedBrokers);
String updatedConn = jsonUtility.toJson(connectionDetail);
updateConnectionREST(CONN_KAFKA, updatedConn).andExpect(status().isOk());
}
}
您需要告诉经纪人预先创建主题...
@SpringBootTest
@EmbeddedKafka(topics = "foo", partitions = 3)
class So57481979ApplicationTests {
@Test
void testPartitions(@Autowired KafkaAdmin admin) throws InterruptedException, ExecutionException {
AdminClient client = AdminClient.create(admin.getConfig());
Map<String, TopicDescription> map = client.describeTopics(Collections.singletonList("foo")).all().get();
System.out.println(map.values().iterator().next().partitions().size());
}
}
或者设置 num.partitions
代理 属性 如果您希望代理在首次使用时自动为您创建主题。
我们应该根据分区 属性.
自动执行此操作
我发现 bootstrapServersProperty
在 @EmbeddedKafka
中很重要,它用于填充 application-test.yml
中的 属性,然后可以用来创建 [=15] =]容器。
我在 JUnit 测试中启动了一个 EmbeddedKafka 实例。我可以在应用程序中正确读取已推送到流的记录,但我注意到的一件事是每个主题只有一个分区。谁能解释一下为什么?
在我的应用程序中,我有以下内容:
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
此 returns 包含一项的列表。当 运行 针对具有 3 个分区的本地 Kafka 时,它 returns 一个包含 3 个项目的列表如预期的那样。
我的测试如下:
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 3)
@ActiveProfiles("inmemory")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@TestPropertySource(
locations = "classpath:application-test.properties",
properties = {"app.onlyMonitorIfDataUpdated=true"})
public class MonitorRestKafkaIntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${spring.embedded.kafka.brokers}")
private String embeddedBrokers;
@Autowired
private WebApplicationContext wac;
@Autowired
private JsonUtility jsonUtility;
private MockMvc mockMvc;
@Before
public void setup() {
mockMvc = webAppContextSetup(wac).build();
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("dummyUser"));
}
private ResultActions interactiveMonitoringREST(String eggID, String monitoringParams) throws Exception {
return mockMvc.perform(post(String.format("/eggs/%s/interactive", eggID)).contentType(MediaType.APPLICATION_JSON_VALUE).content(monitoringParams));
}
@Test
@WithMockUser("super_user")
public void testEmbeddedKafka() throws Exception {
Producer<String, String> producer = getKafkaProducer();
sendRecords(producer, 3);
updateConn();
interactiveMonitoringREST(EGG_KAFKA, monitoringParams)
.andExpect(status().isOk())
.andDo(print())
.andExpect(jsonPath("$.taskResults[0].resultDetails.numberOfRecordsProcessed").value(3))
.andExpect(jsonPath("$.taskResults[0].resultDetails.numberOfRecordsSkipped").value(0));
}
private void sendRecords(Producer<String, String> producer, int records) {
for (int i = 0; i < records; i++) {
String val = "{\"auto_age\":" + String.valueOf(i + 10) + "}";
producer.send(new ProducerRecord<>(testTopic, String.valueOf(i), val));
}
producer.flush();
}
private Producer<String, String> getKafkaProducer() {
Map<String, Object> prodConfigs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<>(prodConfigs, new StringSerializer(), new StringSerializer()).createProducer();
}
private void updateConn() throws Exception {
String conn = getConnectionREST(CONN_KAFKA).andReturn().getResponse().getContentAsString();
ConnectionDetail connectionDetail = jsonUtility.fromJson(conn, ConnectionDetail.class);
connectionDetail.getDetails().put(ConnectionDetailConstants.CONNECTION_SERVER, embeddedBrokers);
String updatedConn = jsonUtility.toJson(connectionDetail);
updateConnectionREST(CONN_KAFKA, updatedConn).andExpect(status().isOk());
}
}
您需要告诉经纪人预先创建主题...
@SpringBootTest
@EmbeddedKafka(topics = "foo", partitions = 3)
class So57481979ApplicationTests {
@Test
void testPartitions(@Autowired KafkaAdmin admin) throws InterruptedException, ExecutionException {
AdminClient client = AdminClient.create(admin.getConfig());
Map<String, TopicDescription> map = client.describeTopics(Collections.singletonList("foo")).all().get();
System.out.println(map.values().iterator().next().partitions().size());
}
}
或者设置 num.partitions
代理 属性 如果您希望代理在首次使用时自动为您创建主题。
我们应该根据分区 属性.
自动执行此操作我发现 bootstrapServersProperty
在 @EmbeddedKafka
中很重要,它用于填充 application-test.yml
中的 属性,然后可以用来创建 [=15] =]容器。