typescript 在angular中实现SSE

gcuhipw9  于 2023-06-24  发布在  TypeScript
关注(0)|答案(1)|浏览(141)

我试图在我的测试应用程序中实现SSE。服务器端已经设置好了,我只是在使用端点(API/v1/sse/document)。场景是,当我进行扫描时,扫描应该出现在我的测试客户机和主应用程序中。问题是它只在我刷新页面时出现在我的测试客户端上。我已经编写了一些代码,但我仍然能够自动更新测试客户端中的事件。
这是我写的代码
sse.service.ts
[document-list.component.ts]

public ngOnInit(): void {
        this.getDocuments();
        this.registerServerSentEvent();
    }

    public ngOnDestroy(): void {
        this.closeServerSentEvent();
    }

    /**
     * getDocuments: call the api service to get the list of documents.
     */
    public getDocuments(): void {
        this.aService.getDocuments().subscribe((documents: Documents[]) => {
            this.documents = documents;
        });
    }

    /**
     * markDocumentAsProcessed: call the api service to mark the document as processed.
     */
    public markDocumentAsProcessed(document: Documents): void {
        this.aService.markDocumentProcessed(document.document.id).subscribe({
            next: () => {
                // Remove the processed document from the list
                this.documents = this.documents.filter((doc) => doc.document.id !== document.document.id);
                this.closeDialog();
            },
            error: (error) => {
                console.log("markDocumentProcessed Error:", error);
                // Handle the error here
            },
        });
    }

    /**
     * showDetails: call the api service to get the document image and display it in a dialog.
     */
    public showDetails(document: Documents): void {
        this.aService.getDocumentImage(document.document.id).subscribe((image: Blob) => {
            const url = window.URL.createObjectURL(image);
            const safeUrl: SafeUrl = this.sanitizer.bypassSecurityTrustUrl(url);
            this.selectedDocument = {...document, imageDataURL: safeUrl};
            this.displayDialog = true;
        });
    }

    /**
     * closeDialog: close the dialog after pressing the process button.
     */
    private closeDialog(): void {
        this.displayDialog = false;
        this.selectedDocument = null;
    }

    private registerServerSentEvent(): void {
        const sseUrl = `${this.aService.config.url}api/v1/sse/document`;
        this.sseSubscription = this.sseService.getServerSentEvent(sseUrl).subscribe((event: MessageEvent) => {
            const documentEvent = JSON.parse(event.data);
            const eventType = documentEvent.type;
            const eventData = documentEvent.data;

            switch (eventType) {
                case "NewDocument": {
                    // Process new document event
                    break;
                }

                case "ViewDocument": {
                    // Process view document event
                    break;
                }

                case "WatchlistStatusUpdate": {
                    // Process watchlist status update event
                    break;
                }

                case "DocumentProcessed": {
                    // Process document processed event
                    const processedDocumentId = eventData.documentId;
                    this.updateProcessedDocument(processedDocumentId);
                    break;
                }
            }
        });
    }

    private updateProcessedDocument(processedDocumentId: string): void {
        // Find the processed document in the documents list
        const processedDocumentIndex = this.documents.findIndex((doc) => doc.document.id === processedDocumentId);
        if (processedDocumentIndex !== -1) {
            // Remove the processed document from the list
            this.documents.splice(processedDocumentIndex, 1);
            // Update any other UI-related logic or perform additional actions as needed
        }
    }

    private closeServerSentEvent(): void {
        if (this.sseSubscription) {
            this.sseSubscription.unsubscribe();
            this.sseService.closeEventSource();
        }
    }
}

这是我的服务,它获取文档等
[a.service.ts]

public getDocuments(): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http
            .get<Documents[]>(`${url}api/v1/document/`, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * markDocumentProcessed: calls the api service to mark the document as processed.
     *
     */
    public markDocumentProcessed(documentId: string): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        const requestBody = {
            DocumentId: documentId,
        };

        return this.http
            .post<Documents[]>(`${url}api/v1/document/processed`, requestBody, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * getDocumentImage: calls the api service to get the document image.
     *
     */
    public getDocumentImage(documentId: string): Observable<Blob> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http.get(`${url}api/v1/document/${documentId}/image/Photo`, {
            responseType: "blob",
            headers: {
                authorization: this.config.authorization,
            },
        });
    }

我在控制台中没有得到任何错误,我得到了一个200的端点,但我的测试客户端没有自动接收到事件,我必须刷新才能看到更改。

zqry0prt

zqry0prt1#

问题是您将SSE端点用作常规GET端点。当然,只有在您请求更新时,您才会获得更新,在您的情况下,通过重新加载页面。
这就是你正在做的事情(摘自你的SSE服务截图):

public getServerSentEvent(url: string): Observable<MessageEvent> {
  const token = this.aService.config.authorization;
  // WARNING - SSE doesn't have an API for Headers, this won't work if used as proper SSE
  const headers = new HttpHeaders({
    Authorization: token,
  });

  return new Observable<MessageEvent>((observer) => {
    // HERE - you are not creating the SSE Event Source, just getting from it
    this.http.get(url, {headers, responseType: "text"}).subscribe({
      //...
    })
  })
}

因此,您在SSE端点上调用HTTP GET,然后将其伪装成SSE事件。你也看到了,这是行不通的。
你应该打开一个新的EventSource,这将实际上像SSE应该那样工作:

// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
  // Close event source if current instance of SSE service has some
  if (this.eventSource) {
    this.closeSseConnection();
    this.eventSource = null;
  }
  // Open new channel, create new EventSource - provide your own URL
  this.eventSource = new EventSource(this.yourSSEurl);

  // Process default event
  this.eventSource.onmessage = (event: MessageEvent) => {
    this.zone.run(() => this.processSseEvent(event));
  };

  // Add your own EVENTS handling...
  /*
    enum SSE_EVENTS [
      NewDocument: 'NewDocument',
      //...
    ]
  */
  Object.keys(SSE_EVENTS).forEach(key => {
    this.eventSource.addEventListener(SSE_EVENTS[key], event => {
      this.zone.run(() => this.processSseEvent(event));
    });
  });

  // Process connection opened
  this.eventSource.onopen = () => {
    this.reconnectFrequencySec = 1;
  };

  // Process error
  this.eventSource.onerror = (error: any) => {
    this.reconnectOnError();
  };
}

看看我的老答案:Handling SSE reconnect on error。在那里你可以找到几乎所有你需要的SSE服务。

相关问题