如何将librdkafka负载转换为avro以获取参数值

idfiyjo8  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(299)

我正在尝试使用librdkafka库向kafka发布/订阅消息。我无法将邮件反序列化到avro。有人能帮我理解如何从有效载荷(rkm->payload)构造avro\u datum\t对象吗。
生产者代码片段

const char  PERSON_SCHEMA[] =
"{\"type\":\"record\",\
  \"name\":\"Person\",\
  \"fields\":[\
     {\"name\": \"ID\", \"type\": \"long\"},\
     {\"name\": \"First\", \"type\": \"string\"},\
     {\"name\": \"Last\", \"type\": \"string\"},\
     {\"name\": \"Phone\", \"type\": \"string\"},\
     {\"name\": \"Age\", \"type\": \"int\"}]}";

void init_schema(void)
{
        if (avro_schema_from_json_literal(PERSON_SCHEMA, &person_schema)) {
                fprintf(stderr, "Unable to parse person schema\n");
                exit(EXIT_FAILURE);
        }
}

avro_datum_t add_person(const char *first, const char *last, const char *phone, int32_t age)
{
        avro_datum_t person = avro_record(person_schema);

        avro_datum_t id_datum = avro_int64(++id);
        avro_datum_t first_datum = avro_string(first);
        avro_datum_t last_datum = avro_string(last);
        avro_datum_t age_datum = avro_int32(age);
        avro_datum_t phone_datum = avro_string(phone);

        if (avro_record_set(person, "ID", id_datum)
            || avro_record_set(person, "First", first_datum)
            || avro_record_set(person, "Last", last_datum)
            || avro_record_set(person, "Age", age_datum)
            || avro_record_set(person, "Phone", phone_datum)) {
                fprintf(stderr, "Unable to create Person datum structure\n");
                exit(EXIT_FAILURE);
        }
        return person;
}

/* Asynchronous produce */
err = rd_kafka_producev(
      rk,
      RD_KAFKA_V_TOPIC(topic),
      RD_KAFKA_V_KEY(user, strlen(key)),
      RD_KAFKA_V_VALUE(person, sizeof(person)),
      RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
      RD_KAFKA_V_OPAQUE(&delivery_counter),
                        RD_KAFKA_V_END);

在用户端,如何将rkm->payload反序列化为avro\u datum\t对象并提取参数。
a感谢您在这方面的帮助或指点。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题