如何使用python创建事件总线?

b5buobof  于 2023-03-20  发布在  Python
关注(0)|答案(2)|浏览(196)

所以,我做了一个跟踪twitter关注者的脚本,脚本的大部分工作分两部分完成:一个函数track_user(old_followers: dict, screen_name: str) -> dict,其中dict包含新关注者和未关注者的信息;第二个函数从第一个函数中获取dict,解析它,并将其转换为有意义的文本。
我的困境是:我希望这个脚本的其他用户能够插入他们自己的独立于我自己的代码。
下面我给予一个例子

# My code

def track_user(old_followers: dict, screen_name: str) -> dict:
    final_dict = tracking_code_here()
    SomeEventBus.emit(TrackUserEvent(final_dict))
    return final_dict

# Some user's code (using my example above as an example)
def convert_tracking_to_text(event: TrackUserEvent): 
    perform_some_text_conversion(event.final_dict)
    return

# Maybe a decorator can be used in this case instead
MyScript.EventBus.AddListener(TrackUserEvent, convert_tracking_to_text)

希望这是有意义的。我只想让我的脚本发出其他人可以导入和侦听的事件。

k7fdbhmy

k7fdbhmy1#

我以前做了一个用于处理异步事件的library

# creates a main event manager
manager = EventManager()

# creates an event
track_user_event = manager.create_event("track_user_event")

async def track_user(old_followers: dict, screen_name: str) -> dict:
    final_dict = tracking_code_here()
    
    # emit event here
    await track_user_event(TrackUserEvent(finalDict)) # could also be await manager.raise_event("track_user_event");

    return final_dict

@track_user_event.as_callback()
async def convert_tracking_to_text(event: TrackUserEvent): 
    perform_some_text_conversion(event.final_dict)
    return

它离完美还很远,但我希望它能帮助你。

7uzetpgm

7uzetpgm2#

我接受了另一个答案,因为它很有用,而且确实回答了问题,但我改用了event-bus库。
例如

def do_query():
    # Announce that user query starting
    MAIN_BUS.emit(events.StartQueryEvent.EVENT_NAME)
    query_users_events = []

    for usr in USERS_TO_QUERY:
        this_user = some_query(usr)
        # Emit event for this specific user
        MAIN_BUS.emit(events.TwitterUserFollowingQuery.EVENT_NAME, this_user)

        # Keeping track of all queried users
        query_users_events.append(this_user)

    # Emit an event with data of all queried users
    MAIN_BUS.emit(events.FinishAllQueryEvents.EVENT_NAME, query_users_events)

在另一个文件里,

# Compare old followers/mutuals to current mutuals after all query events are completed
@bus.on(events.FinishAllQueryEvents.EVENT_NAME)
def keep_track(user_data: list[events.TwitterUserFollowingQuery]):
    some_logic_based_on_data(user_data)

Full example repo

相关问题