python-3.x 如何用Tkinter显示动态字符串

tv6aics1  于 2023-04-08  发布在  Python
关注(0)|答案(1)|浏览(163)

我有一个我希望是一个相当简单的问题要解决,但我还没有设法让它工作,因为我想与我有限的知识。
我有一个C#脚本管道在字符串(一个单词)我的python代码粘贴在下面.我原来有这个代码显示在一个小OLED显示器上的字符串每次一个新的字符串被接收(我现在已经删除了这个函数的形式代码).我现在想改变这一点,使字符串显示图形使用Tkinter在我的监视器上的窗口.
我下面的努力有一个窗口打开时,一个字符串(message.decode())被接收到的话显示,但它不改变,它只是挂起.有人能建议我需要做什么,使这在同一个窗口中自动更改?

from board import SCL, SDA
import busio
from PIL import Image, ImageDraw, ImageFont
import adafruit_ssd1306
import socket
import sys
import struct

# importing whole module
from tkinter import *
from tkinter.ttk import *

# importing strftime function to
# retrieve system's time
from time import strftime

# Create the I2C interface.
i2c = busio.I2C(SCL, SDA)

# Create a UD socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

# Connect the socket to the port where the server is listening
server_address = '/tmp/CoreFxPipe_testpipe'
print('connecting to {}'.format(server_address))

try:
    sock.connect(server_address)
except socket.error as msg:
    print(msg)
    sys.exit(1)

try:
    word = ""
    while True:
        # amount_expected = struct.unpack('I', sock.recv(4))[0]
        # print("amount_expected :", amount_expected)

        message = sock.recv(128)
        if message:
            print("Received message : ", message.decode())  # making it a string

            class Window(Frame):
                def __init__(self, master=None):
                    Frame.__init__(self, master)
                    self.master = master
                    self.pack(fill=BOTH, expand=1)

                    text = Label(self, text=message.decode())
                    text.place(x=70, y=90)

            root = Tk()
            app = Window(root)
            root.wm_title("tkinter window")
            root.geometry("200x200")
            root.mainloop()



finally:
    print('closing socket...')
    sock.close()
piv4azn7

piv4azn71#

你可以在一个单独的Thread中运行socket consumer(接收器),并将它接收到的消息放入Queue中,然后使用tkinter的after方法在tkinter事件循环中消费消息。下面是一个例子:

...  # I'm leaving out your other imports for brevity's sake
import tkinter as tk
import tkinter.messagebox as tkm
from queue import Queue, Full, Empty
from threading import Thread, Event
from tkinter import ttk

class Root(tk.Tk):
    def __init__(self):
        super().__init__()  # initialze Tk
        self.title('My App!')
        self.geometry('200x200')
        # tell the app what to do when closing
        self.protocol('WM_DELETE_WINDOW', self.on_close)
        # this queue passes the msg from the worker to the main UI
        self.message_queue = Queue(maxsize=1) 
        # this event flag is how we'll stop the thread worker on exit
        self.stop_event = Event()  
        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.text = ttk.Label(self, text='Waiting for a message...')
        self.text.pack(expand=True, fill='both')
        self.consumer_loop_id = None  # placeholder for the consumer 'after' ID
        # this sets how quickly you want the consumer loop to check for updates
        self.loop_rate_ms = 100

        try:
            self.sock.connect(server_address)
        except socket.error as msg:
            tkm.showerror('An Error Occurred', msg)
            self.on_close()  # clean up and quit
        else:
            self.start_worker_thread()  # let's go!
            self.message_consumer()  # start the consumer loop

    # this function will get run in its own thread
    @staticmethod
    def socket_worker(sock, message_queue, stop_event):
        """Receive messages and put them into a lossy queue"""
        # keep running until the stop_event flag is set
        while not stop_event.is_set():  
            try:
                message = sock.recv(128).decode()
                # put msg in queue without blocking
                message_queue.put_nowait(message)  
            except Full:  # queue is full, last message hasn't been consumed yet...
                pass  # too bad, better luck next time
            except socket.error:
                stop_event.set()  # bail

    def start_socket_worker(self): 
        """Launch the worker thread"""
        self.worker_thread = Thread(
            name='Message Worker',  # not necessary, but helpful for debugging!
            target=self.socket_worker,
            args=(self.sock, self.message_queue, self.stop_event,)
        )
        self.worker_thread.start()

    def message_consumer(self):
        """Get messages from the queue and update the UI"""
        try:
            message = self.message_queue.get_nowait()
        except Empty:
            pass  # no message, nothing to do
        else: 
            self.text.config(text=message)  # update the label with the new message
        finally:
            self.consumer_loop_id = self.after(
                self.loop_rate_ms,
                self.message_consumer  # call this method again to keep looping!
            )
    
    def on_close(self):
        if self.consumer_loop_id:
            self.after_cancel(consumer_loop_id)  # stop the consumer loop
        if not self.stop_event.is_set():
            self.stop_event.set()  # stop the worker thread
        
        # NOTE: apparently 'join' is unnecessary for non-daemonic threads!
        # if self.worker_thread.is_alive():
            # self.worker_thread.join()

        self.sock.close()
        self.quit()

if __name__ == '__main__':
    app = Root()
    app.mainloop()  # start the app

相关问题