Kafka Consumer 没有收到来自其 Producer 的任何消息
Kafka Consumer didn't receiving any messages from its Producer
以下是我的 python kafka 生产者编码,我不确定这些消息是否能够发布到 Kafka Broker。因为消费者端没有收到任何消息。当我使用生产者控制台命令对其进行测试时,我的消费者 python 程序运行良好。
from __future__ import print_function
import sys
from pyspark import SparkContext
from kafka import KafkaClient, SimpleProducer
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage:spark-submit producer1.py <input file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonRegression")
def sendkafka(messages):
## Set broker port
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka, async=True, batch_send_every_n=5,
batch_send_every_t=10)
send_counts = 0
for message in messages:
try:
print(message)
## Set topic name and push messages to the Kafka Broker
yield producer.send_messages('test', message.encode('utf-8'))
except Exception, e:
print("Error: %s" % str(e))
else:
send_counts += 1
print("The count of prediction results which were sent IN THIS PARTITION
is %d.\n" % send_counts)
## Connect and read the file.
rawData = sc.textFile(sys.argv[1])
## Find and skip the first row
dataHeader = rawData.first()
data = rawData.filter(lambda x: x != dataHeader)
## Collect the RDDs.
sentRDD = data.mapPartitions(sendkafka)
sentRDD.collect()
## Stop file connection
sc.stop()
这是我的"Consumer"python编码
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if len(sys.argv) < 3:
print ("Program to pulls the messages from kafka brokers.")
print("Usage: consume.py <zk> <topic>", file=sys.stderr)
else:
## Flow
## Loads settings from system properties, for launching of spark-submit.
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
## Create a StreamingContext using an existing SparkContext.
ssc = StreamingContext(sc, 10)
## Get everything after the python script name
zkQuorum, topic = sys.argv[1:]
## Create an input stream that pulls messages from Kafka Brokers.
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer",
{topic: 1})
##
lines = kvs.map(lambda x: x[1])
## Print the messages pulled from Kakfa Brokers
lines.pprint()
## Save the pulled messages as file
## lines.saveAsTextFiles("OutputA")
## Start receiving data and processing it
ssc.start()
## Allows the current process to wait for the termination of the context
ssc.awaitTermination()
我通常使用 kafka-console-consumer(Apache Kafka 的一部分)调试此类问题,以使用您尝试生成的主题。如果控制台消费者收到消息,您就知道它们已到达 Kafka。
如果您先 运行 生产者,让它完成,然后启动消费者,那么问题可能是消费者从日志末尾开始,正在等待其他消息。要么确保首先启动消费者,要么将其配置为从头自动启动(抱歉,不确定如何使用您的 Python 客户端)。
您可以检查主题中的消息数量,如果它们随着 Produce 请求而增加:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <Kafka_broker_hostname>:<broker_port> --topic Que1 \
--time -1 --offsets 1 | awk -F ":" '{sum += } END {print sum}'
如果消息数量在增加,说明Producer工作正常
好吧,我认为我的本地 Zookeeper 或 Kafka 有问题,因为我在另一台服务器上测试它运行良好。但是,感谢那些回复我的人 ;)
以下是我的 python kafka 生产者编码,我不确定这些消息是否能够发布到 Kafka Broker。因为消费者端没有收到任何消息。当我使用生产者控制台命令对其进行测试时,我的消费者 python 程序运行良好。
from __future__ import print_function
import sys
from pyspark import SparkContext
from kafka import KafkaClient, SimpleProducer
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage:spark-submit producer1.py <input file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonRegression")
def sendkafka(messages):
## Set broker port
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka, async=True, batch_send_every_n=5,
batch_send_every_t=10)
send_counts = 0
for message in messages:
try:
print(message)
## Set topic name and push messages to the Kafka Broker
yield producer.send_messages('test', message.encode('utf-8'))
except Exception, e:
print("Error: %s" % str(e))
else:
send_counts += 1
print("The count of prediction results which were sent IN THIS PARTITION
is %d.\n" % send_counts)
## Connect and read the file.
rawData = sc.textFile(sys.argv[1])
## Find and skip the first row
dataHeader = rawData.first()
data = rawData.filter(lambda x: x != dataHeader)
## Collect the RDDs.
sentRDD = data.mapPartitions(sendkafka)
sentRDD.collect()
## Stop file connection
sc.stop()
这是我的"Consumer"python编码
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if len(sys.argv) < 3:
print ("Program to pulls the messages from kafka brokers.")
print("Usage: consume.py <zk> <topic>", file=sys.stderr)
else:
## Flow
## Loads settings from system properties, for launching of spark-submit.
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
## Create a StreamingContext using an existing SparkContext.
ssc = StreamingContext(sc, 10)
## Get everything after the python script name
zkQuorum, topic = sys.argv[1:]
## Create an input stream that pulls messages from Kafka Brokers.
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer",
{topic: 1})
##
lines = kvs.map(lambda x: x[1])
## Print the messages pulled from Kakfa Brokers
lines.pprint()
## Save the pulled messages as file
## lines.saveAsTextFiles("OutputA")
## Start receiving data and processing it
ssc.start()
## Allows the current process to wait for the termination of the context
ssc.awaitTermination()
我通常使用 kafka-console-consumer(Apache Kafka 的一部分)调试此类问题,以使用您尝试生成的主题。如果控制台消费者收到消息,您就知道它们已到达 Kafka。
如果您先 运行 生产者,让它完成,然后启动消费者,那么问题可能是消费者从日志末尾开始,正在等待其他消息。要么确保首先启动消费者,要么将其配置为从头自动启动(抱歉,不确定如何使用您的 Python 客户端)。
您可以检查主题中的消息数量,如果它们随着 Produce 请求而增加:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <Kafka_broker_hostname>:<broker_port> --topic Que1 \
--time -1 --offsets 1 | awk -F ":" '{sum += } END {print sum}'
如果消息数量在增加,说明Producer工作正常
好吧,我认为我的本地 Zookeeper 或 Kafka 有问题,因为我在另一台服务器上测试它运行良好。但是,感谢那些回复我的人 ;)