如何通过 Web 服务器作为中间层将 Kafka 消费者集成到移动应用程序?

How to integrate Kafka consumer to mobile app through a web server as an intermediate layer?

我正在开发一个移动应用程序,当 kafka 生产者发送一个通知时,该应用程序会出现一个通知。我正在使用 kafka-python 框架来使用消息。我不确定如何将消费者代码集成到我的移动应用程序中。 我不太确定如何在后端使用 websocket 并将其集成到移动应用程序中。 到目前为止我正在尝试这个:

kafka生产者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

consumer.py:

from kafka import KafkaConsumer
from flask import Flask
import json

new_msg = ""
app = Flask(__name__)
@app.route('/')
def hello_world():
     return new_msg

consumer = KafkaConsumer('Hello-Kafka',bootstrap_servers='localhost:9092')
for msg in consumer:
    if(len(msg.value.decode("utf-8"))!=0):
        new_msg = msg.value.decode("utf-8")
        app.run()

kafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

mobileapp.swift:

import UIKit

class ViewController: UIViewController {

    let myNotification = Notification.Name(rawValue:"MyNotification")

    override func viewDidLoad() {
        super.viewDidLoad()

        let nc = NotificationCenter.default
        nc.addObserver(forName:myNotification, object:nil, queue:nil, using:catchNotification)
    }

    override func viewDidAppear(_ animated: Bool) {
        super.viewDidAppear(animated)
        let nc = NotificationCenter.default
        nc.post(name:myNotification,
                object: nil,
                userInfo:["message":"Hello there!", "date":Date()])
    }

    func catchNotification(notification:Notification) -> Void {
        print("Catch notification")

        guard let userInfo = notification.userInfo,
            let message  = userInfo["message"] as? String,
            let date     = userInfo["date"]    as? Date else {
                print("No userInfo found in notification")
                return
        }

        let alert = UIAlertController(title: "Notification!",
                                      message:"\(message) received at \(date)",
            preferredStyle: UIAlertControllerStyle.alert)
        alert.addAction(UIAlertAction(title: "OK", style: UIAlertActionStyle.default, handler: nil))
        self.present(alert, animated: true, completion: nil)
    }
}

现在,我只能在从 kafka 消费后向浏览器发送一条消息。如何将它集成到移动应用程序中,以便我可以将我使用的每条消息发送到该应用程序?

您可能需要考虑使用 Kafka 的现有 proxy/gateway 实现之一到 websockets,例如此处来自 Microsoft 的 websocket 代理 https://github.com/Microsoft/kafka-proxy-ws or the Akka-Http WebSocket based service from Landoop here https://github.com/Landoop/kafka-ws