openwhisk向kafka发送消息超时

i7uaboj4  于 2021-07-15  发布在  Kafka
关注(0)|答案(0)|浏览(291)

环境详细信息

centos7、独立openwhisk

问题描述

我计划在openwhisk中向kafka发送一条消息,数据流流程是:wsk cli->openwhisk action->kafka console consume。
但在这个过程中会出现间歇性故障,如:我发送“test01”~“test06”,只得到“test02”、“test04”、“test06”。
根据日志,失败的原因是超时。
这是我的动作脚本:

// js
const kafka = require('kafka-node');

const Producer = kafka.Producer;
const client = new kafka.KafkaClient({
    kafkaHost: "192.168.68.132:3093"
});

const producer = new Producer(client);

function main(actionObj) {
    var message = JSON.stringify(actionObj);
    const payloads = [
        {
            topic: 'beforeReductionTopic',
            messages: message,
            partition: 0
        }
    ];
    return new Promise(function (resolve, reject) {
        producer
        .on('ready', () => {
            producer.send(payloads, function (err, data) {
                if (err) {
                    console.log("++++++ producer err: ", err);
                    return reject(err);
                } else {
                    console.log("++++++ producer data: ", data);
                    return resolve(data);
                }
            });
        })
        .on("error", err => {
            console.error(err);
            return reject(err);
        });
    });
}

exports.main = main;

根据日志,这是“动作开发者错误”。
我用下面的代码来测试动作脚本,Kafka可以接收所有的消息。

const action = require("./openwhisk-kafka.js");
for(let i = 0; i < 3; i++) {
    action.main("ccc");
}

我不知道如何修改动作脚本。

日志详细信息

用wsk cli调用openwhisk操作的操作记录

[root@localhost openwhisk-kafka]# wsk action update scicatTest --kind nodejs:10 /test/actions/openwhisk-kafka/openwhisk-kafka.zip 
ok: updated action scicatTest
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test01"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 78e633791c114d29a633791c11fd29c2
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test02"
{
    "beforeReductionTopic": {
        "0": 57
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test03"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 09165a45a0f94011965a45a0f920110b
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test04"
{
    "beforeReductionTopic": {
        "0": 58
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test05"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test06"
{
    "beforeReductionTopic": {
        "0": 59
    }
}
[root@localhost openwhisk-kafka]#

失败操作日志

[root@localhost openwhisk-kafka]# wsk activation get 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
ok: got activation 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
{
    "namespace": "guest",
    "name": "scicatTest",
    "version": "0.0.4",
    "subject": "guest",
    "activationId": "8a9f38f9d5ab4fa49f38f9d5ab6fa45a",
    "start": 1619540684355,
    "end": 1619540744418,
    "duration": 60063,
    "statusCode": 0,
    "response": {
        "status": "action developer error",
        "statusCode": 0,
        "success": false,
        "result": {
            "error": "The action exceeded its time limits of 60000 milliseconds."
        }
    },
    "logs": [],
    "annotations": [
        {
            "key": "path",
            "value": "guest/scicatTest"
        },
        {
            "key": "waitTime",
            "value": 652
        },
        {
            "key": "kind",
            "value": "nodejs:10"
        },
        {
            "key": "timeout",
            "value": true
        },
        {
            "key": "limits",
            "value": {
                "concurrency": 1,
                "logs": 10,
                "memory": 256,
                "timeout": 60000
            }
        }
    ],
    "publish": false
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题