我创建了两个非常简单的网络组件。两者都有一个内置于serverjs的sse服务的侦听器。唯一的区别是,一个端点每5秒钟响应一个简单的数据,而另一个端点则使用一个kafka主题,并在任何时候在该主题中发布新消息时响应一个简单的消息。
当我启动后端(即两个端点)时,两个网络计算机都正常工作。一两分钟后,使用kafka消息的webcomponet抛出nodejs端点,结果崩溃,没有值得一提的消息(至少对我来说不值得。
检查nodejs服务,我可以看到它工作正常。我的意思是,每当我向主题发布消息时(kafka-console-producer.bat——代理列表)localhost:9092 --topic test'data:testing')我可以看到nodejs consumer.on('message',函数(message){…)立即被触发。
对我来说很奇怪的是,webcomponent接收到的第一条消息,但是它只是停止接收而没有任何异常(好吧,至少我在chrome调试中没有发现任何异常)。
所有代码都在github中,如果您有zookeeper server+kafka server+topic named test并发送任何类似“data:my test”的消息,那么只需npm instal&npm start就可以使用这两个后端了:
前端:https://github.com/jimisdrpc/simplest-webcomponet/blob/master/public/index.html
后端:https://github.com/jimisdrpc/simplest-kafkaconsumer
以下是最相关的部分:
响应来自kafka主题的消息捕获的终结点:
...
response.writeHead(200, {
Connection: "keep-alive",
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
'Access-Control-Allow-Origin': '*'
});
consumer.on('message', function (message) {
response.write("event: sendMsgFromKafka\n");
response.write('id: ' + i++ + '\n');
response.write('data: ' + message.value);
response.write("\n\n");
});
前端:整个网络组件仅成功读取第一个事件,并在几分钟后崩溃:
const template = document.createElement('template');
template.innerHTML = `<input id="inputKafka"/> `;
class InputKafka extends HTMLElement {
constructor() {
super();
}
connectedCallback() {
this.attachShadow({mode: 'open'})
this.shadowRoot.appendChild(template.content.cloneNode(true))
const inputKafka = this.shadowRoot.getElementById('inputKafka');
var source = new EventSource('http://localhost:5000/kafka_sse');
source.addEventListener('sendMsgFromKafka', function(e) {
console.log('fromKafka');
inputKafka.value = e.data;
}, false);
}
attributeChangedCallback(name, oldVal, newVal) {
console.log('attributeChangedCallback');
}
disconnectedCallback() {
console.log('disconnectedCallback');
}
}
window.customElements.define("input-kafka", InputKafka);
我猜这与Kafka无关,因为我可以看到nodejs总是成功地消费主题信息。也许我必须在响应头中设置一些额外的参数,以便在一段时间后它不会停止。有趣的是,对于其他webcomponet,它读取一个nodejs服务(自动响应的间隔为5秒)时从不停止工作(如果需要,您可以在github中检查整个代码,但它完全相同,只是没有使用kafka主题,而是使用setinterval(()=>{…,5000})。
有时当它停下来阅读nodejs kafka服务时 GET http://localhost:5000/kafka_sse net::ERR_EMPTY_RESPONSE
例外,但并非总是如此。
综上所述:您可以将通过nodejs架构消费消息的webcomponet看作:
Webcomponent Eventsource Listener added when loaded the page in Browser -> Alive connection Opened -> Kafka Topic Message produced -> NodeJs Kafka Consumer consume message -> HTTP Server responds to Webcomponent
没有Kafka的那一个:
Webcomponent Eventsource Listener added when loaded the page in Browser -> Alive connection Opened -> setInterval -> HTTP Server responds to Webcomponent according to the interval and never crashes
我的主要问题是:为什么这个体系结构在几次停顿之后就停止了,而完全相同的体系结构,但有规则的事件却没有?一些见解考虑到可能的技巧,以保持连接或一些额外的细节,因为nodejs是单线程将受到高度赞赏。
暂无答案!
目前还没有任何答案,快来回答吧!