我如何在另一个 spark 集群的不同集群中创建 Kafka 主题 运行?
How can i create Kafka topic running in different cluster from another spark cluster?
我有两个集群 运行 Kafka 和 spark 分别。我想从 spark 集群创建一个 kafka-topic。我注意到要创建一个主题,我们需要调用 Kafka-topics.sh,它在 spark 集群中不可用。命令应通过 shell.
调用
例如:/kafka_topics.sh --zookeeper :2181 --create --topic test_topic
这个脚本应该从 spark 集群调用,它应该在 Kafka 集群上执行。
谁能帮帮我?
您可以使用 java api 和 Maven 依赖项(kafka 和 zookeeper)来创建 kafka 主题,如下所示。您可以从提交 spark 应用程序的代码调用代码。
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import kafka.utils.ZKStringSerializer$;
import kafka.admin.AdminUtils;
public final class KafkaUtils {
public static void main(String[] args) throws Exception {
KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());
}
public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) {
ZkClient zkClient = null;
try {
zkClient = getZkClient(zkHosts);
AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties);
} catch (Exception exception) {
exception.printStackTrace();
} finally {
if (zkClient != null) {
try {
zkClient.close();
} catch (ZkInterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private static ZkClient getZkClient(String zkHosts) {
ZkClient zkClient = null;
// Zookeeper sessionTimeoutMs
final int sessionTimeoutMs = 10000;
// Zookeeper connectionTimeoutMs
final int connectionTimeoutMs = 10000;
zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
return zkClient;
}
}
这里x.x.x.x和y.y.y.y是kafka的zk集群主机。希望这有帮助。
我有两个集群 运行 Kafka 和 spark 分别。我想从 spark 集群创建一个 kafka-topic。我注意到要创建一个主题,我们需要调用 Kafka-topics.sh,它在 spark 集群中不可用。命令应通过 shell.
调用例如:/kafka_topics.sh --zookeeper :2181 --create --topic test_topic
这个脚本应该从 spark 集群调用,它应该在 Kafka 集群上执行。 谁能帮帮我?
您可以使用 java api 和 Maven 依赖项(kafka 和 zookeeper)来创建 kafka 主题,如下所示。您可以从提交 spark 应用程序的代码调用代码。
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import kafka.utils.ZKStringSerializer$;
import kafka.admin.AdminUtils;
public final class KafkaUtils {
public static void main(String[] args) throws Exception {
KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());
}
public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) {
ZkClient zkClient = null;
try {
zkClient = getZkClient(zkHosts);
AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties);
} catch (Exception exception) {
exception.printStackTrace();
} finally {
if (zkClient != null) {
try {
zkClient.close();
} catch (ZkInterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private static ZkClient getZkClient(String zkHosts) {
ZkClient zkClient = null;
// Zookeeper sessionTimeoutMs
final int sessionTimeoutMs = 10000;
// Zookeeper connectionTimeoutMs
final int connectionTimeoutMs = 10000;
zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
return zkClient;
}
}
这里x.x.x.x和y.y.y.y是kafka的zk集群主机。希望这有帮助。