在用Python编写PyFlink代码时,我使用Flink 1.15.2
,并使用以下连接器从RabbitMQ获取消息:flink-sql-connector-rabbitmq-1.15.2.jar
然而,当我尝试使用此代码下沉到RabbitMQ时,请遵循以下链接:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/rabbitmq/#installing-rabbitmq
stream.add_sink(RMQSink(
connection_config, # config for the RabbitMQ connection
'queueName', # name of the RabbitMQ queue to send messages to
SimpleStringSchema()))
我得到了以下错误跟踪:
File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = "{"created":"@1662371359.807069114","description":"Error received from peer ipv6:[::1]:44295","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Multiplexer hanging up","grpc_status":1}"
>
Traceback (most recent call last):
File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
和更多日志:
RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.
at org.apache.flink.runtime.state.StateSerializerProvider$EagerlyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:344)
at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)
谢谢你的帮助。
1条答案
按热度按时间zdwk9cvp1#
我通过将此Map添加到String中修复了此问题,因为原始数据流包含元组: