<h1> Websocket Test </h1>
<script>
const ws = new WebSocket('ws://localhost:4000')
ws.onopen = () => {
console.log('ws opened on browser')
ws.send('hello world')
}
ws.onmessage = (message) => {
console.log(`message received`, message.data)
}
</script>
如果你使用的是python 3.11,下面的函数应该像这样重写:
async def send_to_clients(self, message: str) -> None:
if self.clients:
logging.info("trying to send")
[await client.send(message) for client in self.clients]
from datetime import time
import schedule
import time
import socket
import threading
alarm_time = input()
my_string = str(alarm_time)
my_new_time = my_string.format("%H:%M %Z")
EADER = 64
PORT = xxxx
SERVER = 'xxxxxxxx'
ADDR = (SERVER, PORT)
FORMAT = 'utf-8'
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(ADDR)
if alarm_time is not None and alarm_time != 0:
print(f"Scheduled for: {my_new_time}")
else:
sys.exit()
def job():
client.connect(ADDR)
name_time = self.name_send + ' ' + my_date
message = name_time.encode(FORMAT)
msg_length = len(message)
send_length = str(msg_length).encode(FORMAT)
send_length += b' ' * (HEADER - len(send_length))
client.send(send_length)
client.send(message)
schedule.every().day.at(my_new_time).do(job)
while True:
schedule.run_pending()
time.sleep(1)
Not sure if this is any help tbh. But its a small tweak of a script I use with socket, subprocess etc to schedule them
2条答案
按热度按时间e5nqia271#
我在回答我自己的问题...
这是一个Python websockets服务器的工作示例,它每5秒向所有客户端发送一条消息。我写了这个,并设法让它工作,因为我在网上找不到一个这样的例子(2021年3月)
希望这对其他人有帮助,如果任何人有改进或更好的解决方案的建议,使用软件包,可能添加ssl支持或订阅类型的服务添加,请在评论或回答部分写。
要查看其工作情况,您可以使用这个简单的html文件作为客户端,然后打开检查器以查看Console日志中的传入消息。
如果你使用的是python 3.11,下面的函数应该像这样重写:
relj7zay2#