如何在WebSocket中创建主题,如stomp或其他协议golang

ubby3x7f  于 12个月前  发布在  Go
关注(0)|答案(2)|浏览(149)

如何在golang中创建主题并发送WebSocket消息?我正在使用gorilla/WebSocket库来创建我的WebSocket,但我想创建主题来发送和接收消息,有人对如何做到这一点有任何想法吗?我将在下面留下我的代码:

package main

import (
    "fmt"
    "log"
    "net/http"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

func main() {
    http.HandleFunc("/gs-guide-websocket", wsHandler)
    log.Fatal(http.ListenAndServe("x.x.x.x:8080", nil))

    // Fecha a conexão

}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }

    defer conn.Close()

    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }

        fmt.Println("Received message:", string(message))

        err = conn.WriteMessage(messageType, message)

        if err != nil {
            log.Println(err)
            return
        }

    }
}

字符串
非常感谢你的帮助,谁能帮忙

bqf10yzr

bqf10yzr1#

WebSocket不是消息队列协议,所以没有“topics”的概念,它需要客户端通过HTTP协议主动向服务器请求,然后双方协商升级到一个WebSocket连接,你可以保持这个连接,然后直接向对方发送任何消息。

f5emj3cl

f5emj3cl2#

[解决]
你可以给一个mqtt发一条消息:
伺服器:

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    mqtt "github.com/mochi-mqtt/server/v2"
    "github.com/mochi-mqtt/server/v2/hooks/auth"
    "github.com/mochi-mqtt/server/v2/listeners"
)

func main() {
    // Create signals channel to run server until interrupted
    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigs
        done <- true
    }()

    opts := mqtt.Options{
        InlineClient: true,
    }
    // Create the new MQTT Server.
    server := mqtt.New(&opts)

    // Allow all connections.
    _ = server.AddHook(new(auth.AllowHook), nil)

    // Create a TCP listener on a standard port.
    tcp := listeners.NewTCP("t1", "0.0.0.0:1883", nil)
    err := server.AddListener(tcp)
    if err != nil {
        log.Fatal(err)
    }

    go func() {
        err := server.Serve()
        if err != nil {
            log.Fatal(err)
        }
    }()

    go func() {
        for range time.Tick(time.Second * 5) {
            err := server.Publish("topic/test", []byte("Hello World!\n"), false, 0)
            if err != nil {
                server.Log.Error("server.Publish", "error", err)
            }
            server.Log.Info("main.go issued direct message to direct/publish")
        }
    }()

    // Run server until interrupted
    <-done

    // Cleanup
}

字符串
委托方:

package main

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

// var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//  fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
// }

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v", err)
}

func main() {
    var broker = "localhost"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    //opts.SetClientID("go_mqtt_client")
    //opts.SetUsername("username")
    //opts.SetPassword("password")
    // opts.SetDefaultPublishHandler(messagePubHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    sub(client)
    // Keep the server running
    select {}
    // publish(client)

    //client.Disconnect(250)
}

func publish(client mqtt.Client) {
    num := 10
    for i := 0; i < num; i++ {
        text := fmt.Sprintf("Message %d", i)
        token := client.Publish("topic/test", 0, false, text)
        token.Wait()
        time.Sleep(time.Second)
    }
}

func sub(client mqtt.Client) {
    topic := "topic/test"
    token := client.Subscribe(topic, 1, func(c mqtt.Client, m mqtt.Message) {
        fmt.Printf("Mensagens: %s", m.Payload())
    })
    token.Wait()
    fmt.Printf("Subscribed to topic: %s", topic)
}


你能给跺脚人发个信息吗:

package main

import (
    "fmt"
    "log"

    "github.com/go-stomp/stomp"
)

func main() {
    // Endereço do servidor STOMP
    serverAddress := "localhost:61613"

    // Conectar ao servidor STOMP
    conn, err := stomp.Dial("tcp", serverAddress)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Disconnect()

    // Destino para enviar mensagens
    destination := "/topic/greetings"

    // Corpo da mensagem
    messageBody := "Hello, STOMP!"

    // Subscrever para receber mensagens
    sub, err := conn.Subscribe(destination, stomp.AckAuto)
    if err != nil {
        println(err)
    }
    defer sub.Unsubscribe()

    // Enviar uma mensagem
    err = conn.Send(destination, "text/plain", []byte(messageBody), nil)
    if err != nil {
        println(err)
    }

    // Aguardar por mensagens
    for {
        msg := <-sub.C
        fmt.Printf("Mensagem recebida: %s", msg.Body)
    }
}
package main

import (
    "github.com/go-stomp/stomp/server"
)


func main() {

    err := server.ListenAndServe("localhost:61613")

    if err != nil {
        panic(err)
    }

}

的字符串
感谢@无名6849和@布里茨

相关问题