如何通过网络服务器作为中间层将kafka消费者与移动应用程序集成?

mhd8tkvw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(382)

我正在开发一个移动应用程序,当Kafka制作人发送一个通知时,会出现一个通知。我正在使用kafka python框架来使用消息。我不知道如何将消费者代码集成到我的移动应用程序中。我不太清楚如何在后端使用websocket并将其集成到移动应用程序中。到目前为止,我一直在尝试:
Kafka制作人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

消费者.py:

from kafka import KafkaConsumer
from flask import Flask
import json

new_msg = ""
app = Flask(__name__)
@app.route('/')
def hello_world():
     return new_msg

consumer = KafkaConsumer('Hello-Kafka',bootstrap_servers='localhost:9092')
for msg in consumer:
    if(len(msg.value.decode("utf-8"))!=0):
        new_msg = msg.value.decode("utf-8")
        app.run()

kafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

手机app.swift:

import UIKit

class ViewController: UIViewController {

    let myNotification = Notification.Name(rawValue:"MyNotification")

    override func viewDidLoad() {
        super.viewDidLoad()

        let nc = NotificationCenter.default
        nc.addObserver(forName:myNotification, object:nil, queue:nil, using:catchNotification)
    }

    override func viewDidAppear(_ animated: Bool) {
        super.viewDidAppear(animated)
        let nc = NotificationCenter.default
        nc.post(name:myNotification,
                object: nil,
                userInfo:["message":"Hello there!", "date":Date()])
    }

    func catchNotification(notification:Notification) -> Void {
        print("Catch notification")

        guard let userInfo = notification.userInfo,
            let message  = userInfo["message"] as? String,
            let date     = userInfo["date"]    as? Date else {
                print("No userInfo found in notification")
                return
        }

        let alert = UIAlertController(title: "Notification!",
                                      message:"\(message) received at \(date)",
            preferredStyle: UIAlertControllerStyle.alert)
        alert.addAction(UIAlertAction(title: "OK", style: UIAlertActionStyle.default, handler: nil))
        self.present(alert, animated: true, completion: nil)
    }
}

现在,我只能发送一条消息到浏览器后,消费Kafka。如何将其集成到移动应用程序中,以便我可以将我消费的每条消息发送到该应用程序中?

2skhul33

2skhul331#

您可能需要考虑使用kafka到websocket的现有代理/网关实现之一,例如来自microsoft的websocket代理https://github.com/microsoft/kafka-proxy-ws 或者来自landoop的基于akkahttpwebsocket的服务https://github.com/landoop/kafka-ws

相关问题