ApacheFlink有状态函数序列化问题?

gj3fmq9x  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(361)

我正试图在python中使用apache-flink有状态函数构建一个项目,但我似乎无法让它工作。我缩小了问题的范围,认为当我通过protobuf模式向有状态函数发送请求时,序列化程序似乎无法将消息序列化到我期望的类中。我想做的是:

import json
from statefun import StatefulFunctions, RequestReplyHandler
from jobs.session_event_pb2 import Event

functions = StatefulFunctions()

@functions.bind("namespace/funcname")
def funcname(context, session: Event):
    print("hello world")

handler = RequestReplyHandler(functions)

if __name__ == '__main__':
    inputFile = open("my_file.json", "r")
    for line in inputFile:
        data = json.loads(line).get('properties')
        if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
            request = Event()
            request.prop1 = data["prop1"]
            request.prop2 = data["prop2"]
            request = request.SerializeToString()
            handler(request)

这是我的协议模式:

syntax = "proto3";

package mypackage;

message Event {
    string prop1 = 1;
    string prop2 = 2;
}

我做错什么了?

woobm2wo

woobm2wo1#

这是因为requestreply处理程序不接受直接protobuf消息。flink运行时发送一个名为 ToFunction 并接收类型为的响应 FromFunction . 此负载包含调用方消息以及持久值和其他元信息。
如果不能直接调用函数,比如在测试中,我建议您这样做,而不要使用处理程序。

相关问题