PyFlink - RabbitMQ接收器:已经为该状态注册了序列化程序;不允许重新注册

fjaof16o  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(216)

在用Python编写PyFlink代码时,我使用Flink 1.15.2,并使用以下连接器从RabbitMQ获取消息:flink-sql-connector-rabbitmq-1.15.2.jar然而,当我尝试使用此代码下沉到RabbitMQ时,请遵循以下链接:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/rabbitmq/#installing-rabbitmq

stream.add_sink(RMQSink(
    connection_config,      # config for the RabbitMQ connection
    'queueName',            # name of the RabbitMQ queue to send messages to
    SimpleStringSchema()))

我得到了以下错误跟踪:

File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.CANCELLED
    details = "Multiplexer hanging up"
    debug_error_string = "{"created":"@1662371359.807069114","description":"Error received from peer ipv6:[::1]:44295","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Multiplexer hanging up","grpc_status":1}"
>
Traceback (most recent call last):
  File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()

和更多日志:

RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.
    at org.apache.flink.runtime.state.StateSerializerProvider$EagerlyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:344)
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)

谢谢你的帮助。

zdwk9cvp

zdwk9cvp1#

我通过将此Map添加到String中修复了此问题,因为原始数据流包含元组:

ds_string = stream.map(lambda tuple: tuple[0]+str(tuple[0]), output_type=Types.STRING())

相关问题