typeerror:类型为“mappingproxy”的对象不可json序列化

wrrgggsh  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(530)

我正在尝试使用模式注册中心使用合流kafka python的avroproducer发布avro消息。但是代码无法序列化枚举类型。下面是代码和错误跟踪。非常感谢您的帮助。

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from example_schema.schema_classes import SCHEMA as value_schema
from example_schema.com.acme import *
import json

def function():
    avroProducer = AvroProducer({ 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' },  default_value_schema=value_schema)
    print(avroProducer)
    obj = Test()
    obj.name = 'vinay'
    obj.age = 11
    obj.sex = 'm'
    obj.myenum = Suit.CLUBS
    print(str(obj))
    avroProducer.produce(topic='test_topic',value=obj)
    avroProducer.flush()

function()

  File "main.py", line 16, in function
    avroProducer.produce(topic='test_topic',value=json.dumps(obj))
  File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
    value = self._serializer.encode_record_with_schema(topic, value_schema, value)
  File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 105, in encode_record_with_schema
    schema_id = self.registry_client.register(subject, schema)
  File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/cached_schema_registry_client.py", line 216, in register
    body = {'schema': json.dumps(avro_schema.to_json())}
  File "/home/priv/anaconda3/lib/python3.6/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
TypeError: Object of type 'mappingproxy' is not JSON serializable

avro模式-

{
    "type": "record",
    "name": "Test",
    "namespace": "com.acme",
    "fields": [{
            "name": "name",
            "type": "string"
        }, {
            "name": "age",
            "type": "int"
        }, {
            "name": "sex",
            "type": "string"
        }, {
            "name": "myenum",
            "type": ["null", {
                    "type": "enum",
                    "name": "Suit",
                    "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
                }
            ]
        }
    ]
}
ttcibm8c

ttcibm8c1#

既然你用的是avroproducer,那就别用了 json.dumps 什么都行。
如果您查看producer的示例,那么发送的对象是一个字典,而不是json字符串
https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py

hmae6n7t

hmae6n7t2#

如果您没有选择将avro-python3api从1.9.0降级到1.8.2,那么您可能不得不放弃 enum 键入并替换为 string 相反。

"type": ["null", {
                "type": "enum",
                "name": "Suit",
                "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
            }
        ]

变为

"type": ["null", "string"]
i7uq4tfw

i7uq4tfw3#

根据此链接,confluent kafka python api与avro-python3 1.9.0存在兼容性问题。对我有效的解决方案是将avro-python3api从1.9.0降级到1.8.2。

相关问题