librdkafka:rd\u kafka\u assignment为分配的分区返回偏移量-1001

6qfn3psc  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(724)

当我向消费者查询分配的主题分区列表时,结果中的所有分区的偏移量都是-1001。如果我打印出收到的消息的偏移量,偏移量被设置为正确的值。
这是我用来使用消息的代码:

static void print_partition_list(FILE* fp,
    const rd_kafka_topic_partition_list_t
    * partitions) {
    int i;
    for (i = 0; i < partitions->cnt; i++) {
        fprintf(fp, "%s %s [%d] offset %lld",
            i > 0 ? "," : "",
            partitions->elems[i].topic,
            partitions->elems[i].partition,
            partitions->elems[i].offset);
    }
    fprintf(fp, "\n");

}

static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) {
    fprintf(stderr, "%% Consumer group rebalanced: ");
    switch (err) {
    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
        fprintf(stderr, "assigned:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, partitions);
        break;
    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
        fprintf(stderr, "revoked:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, NULL);
        break;
    default:
        fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
        rd_kafka_assign(rk, NULL);
        break;
    }
}

int main()
{

    rd_kafka_t* rk;
    rd_kafka_conf_t* conf;
    rd_kafka_resp_err_t err;

    char errstr[512];
    const char* brokers{ "localhost:9092" };
    const char* groupid{ "OffsetTest" };
    const char* topics[] = { "OffsetTesting" };

    rd_kafka_topic_partition_list_t* subscription;

    conf = rd_kafka_conf_new();

    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "group.id", groupid,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "enable.auto.commit", "false",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "offset.store.method", "broker",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

    rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
        return 1;
    }

    conf = NULL;

    rd_kafka_poll_set_consumer(rk);

    subscription = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(subscription, topics[0], RD_KAFKA_PARTITION_UA);

    err = rd_kafka_subscribe(rk, subscription);
    if (err) {
        fprintf(stderr,
            "%% Failed to subscribe to %d topics: %s\n",
            subscription->cnt, rd_kafka_err2str(err));
        rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(rk);
        return 1;
    }

    fprintf(stderr,
        "%% Subscribed to %d topic(s), "
        "waiting for rebalance and messages...\n",
        subscription->cnt);

    rd_kafka_topic_partition_list_destroy(subscription);

    int runningCounter = 0;

    while (runningCounter != 10) {
        rd_kafka_message_t* rkm;

        rkm = rd_kafka_consumer_poll(rk, 100);
        if (!rkm) {
            Sleep(2000);
            runningCounter++;
            continue;
        }
        if (rkm->err) {
            fprintf(stderr,
                "%% Consumer error: %s\n",
                rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            continue;
        }

        rd_kafka_topic_partition_list_t* list;
        err = rd_kafka_assignment(rk, &list);

        if (err) {
            fprintf(stderr,
                "%% Failed to subscribe to %d topics: %s\n",
                subscription->cnt, rd_kafka_err2str(err));
            rd_kafka_topic_partition_list_destroy(subscription);
            return 1;
        }

        print_partition_list(stderr, list);

        rd_kafka_topic_partition_list_destroy(list);

        printf("Message on %s [%d] at offset %lld:\n",
            rd_kafka_topic_name(rkm->rkt), rkm->partition,
            rkm->offset);

        if (rkm->key)
            printf(" Key: %.*s\n",
            (int)rkm->key_len, (const char*)rkm->key);
        else if (rkm->key)
            printf(" Key: (%d bytes)\n", (int)rkm->key_len);

        if (rkm->payload)
            printf(" Value: %.*s\n",
            (int)rkm->len, (const char*)rkm->payload);
        else if (rkm->key)
            printf(" Value: (%d bytes)\n", (int)rkm->len);

        rd_kafka_commit_message(rk, rkm, 0);

        rd_kafka_message_destroy(rkm);

        runningCounter++;
    }

    fprintf(stderr, "%% Closing consumer\n");
    rd_kafka_consumer_close(rk);

    rd_kafka_destroy(rk);

    return 0;

}

我知道这里有一个类似问题的答案,librdkafka:committed\u offset总是-1001,但这没有帮助。我将主题分区列表分配给 rebalance_cb .
更新:
这是示例2消息的输出:

> %4|1580198390.566|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).

> % Subscribed to 1 topic(s), waiting for rebalance and messages...

> % Consumer group rebalanced: assigned:
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [0] at offset 25:
> Key: 0
> Value: ExampleMessage 0
> 
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [3] at offset 41:
> Key: 1
> Value: ExampleMessage 1
c6ubokkw

c6ubokkw1#

我相信这可能是故意的。
这个 rd_kafka_assignment() 方法返回通过 rd_kafka_assign() . 当消费者被分配到一个组中的分区时,分配只是一个分区列表,没有偏移量。
同样在java库中, assignment() 退货 Set<TopicPartition> ,这里也没有偏移。在利伯Kafka, rd_kafka_assignment() 给一个 rd_kafka_topic_partition_list_t ,类似于 Set<TopicPartition> . 主要区别在于它重用了 rd_kafka_topic_partition_t 有几个额外字段的类型,如 offset .
这个 rd_kafka_topic_partition_t 类型在很多地方都有使用,它的所有字段在所有上下文中都没有意义。赋值上下文就是这种情况,因此有些字段被设置为“空”值,这是 -1001 对于偏移量。
如果要获取赋值的当前偏移量,则需要使用 rd_kafka_position() . 同样,在java中,您可以使用 position() .

相关问题