java 使用Retrofit使用服务器发送的事件

wbgh16ku  于 2023-05-15  发布在  Java
关注(0)|答案(4)|浏览(419)

我正在尝试使用REST API [1],它将服务器发送的事件发送到客户端。我目前正在使用从广场改造消费这一点,但我不知道如何做到这一点。有经验的人能帮忙吗?如果没有改进,请推荐其他可以做到这一点的Java库。
[1] https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-events

o2gm4chl

o2gm4chl1#

试试这个库:oksee
OkSse是OkHttp的扩展库,用于创建服务器发送事件(SSE)客户端
因为我经历了同样的问题,从我的研究,这是最好的选择,现在,因为翻新不支持它。
https://github.com/heremaps/oksse

btxsgosb

btxsgosb2#

我知道这是个老问题。但是我没有找到一个完整的例子,现在试着用我的代码来提供它。我们只使用retrofitcoroutines
1.在retrofit API interface中需要添加代码。注意我们使用的@Streaming和返回类型Call<ResponseBody>

@POST("/v1/calc/group-prices")
@Streaming
fun calculateGroupPrices(@Body listOptions: List<GroupCalculatorOptions>): Call<ResponseBody>

2.在你的repository类中需要添加此代码。注意我们使用flow并读取stream。要理解带有有效负载的消息已经到达,它必须以"data:"开始

fun loadGroupDeliveryRateInfos(listOptions: List<GroupCalculatorOptions>) = flow {
        coroutineScope {
            val response = restApi.calculateGroupPrices(listOptions).execute()
            if (response.isSuccessful) {
                val input = response.body()?.byteStream()?.bufferedReader() ?: throw Exception()
                try {
                    while (isActive) {
                        val line = input.readLine() ?: continue
                        if (line.startsWith("data:")) {
                            try {
                                val groupDeliveryRateInfo = gson.fromJson(
                                    line.substring(5).trim(),
                                    GroupDeliveryRateInfo::class.java
                                )
                                emit(groupDeliveryRateInfo)
                            } catch (e: Exception) {
                                e.printStackTrace()
                            }
                        }
                    }
                } catch (e: IOException) {
                    throw Exception(e)
                } finally {
                    input.close()
                }
            } else {
                throw HttpException(response)
            }
        }
    }

3.最后一步,我们需要在ViewModel中收集数据。我们只需要从repository调用方法

repository.loadGroupDeliveryRateInfos(it.values.toList())
                .collect { info ->
                    handleGroupDeliveryRateInfo(info)
                }

这就是全部,不需要额外的库。

j2qf4p5b

j2qf4p5b3#

没有真实的将Retrofit和SSE混为一谈。使用reflect获取一个输入流,然后查找(或编写)一个将SSE事件分块的输入流解析器。
在reflect中,我有这个:

public interface NotificationAPI {
    @GET("notifications/sse")
    Call<InputStream> getNotificationsStream(@retrofit2.http.Query("Last-Event-ID") String lastEventId);
}

我为InputStream写了一个快速转换器工厂:

public class InputStreamConverterFactory extends Converter.Factory {

    private static class InputStreamConverter implements Converter<ResponseBody, InputStream> {
        @Nullable
        @Override
        public InputStream convert(ResponseBody value) throws IOException {
            return value.byteStream();
        }
    }

    @Override
    public @Nullable
    Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
        if (type.equals(InputStream.class)) {
            return new InputStreamConverter();
        }
        return null;
    }
}

我的客户端代码看起来像这样:

var cinputStream = api.getNotificationsStream(null);
var inputStream = cinputStream.execute().body();
try(var sseStream = new MySSEStreamParser(inputStream)) {
   //handle the stream here...
}

有一个OkHttp SSE解析器,您可能会使用它。然而:

  • OkHttp SSE代码带有线程。您可能希望使用自己的线程模型。
  • 实际的OkHttp SSE解析器是一个内部包。这并不意味着它是一个很好的候选人举。
wh6knrhe

wh6knrhe4#

下面是一个稍微更新的方法,我刚刚基于this博客开始工作。它使用Retrofit的@Streaming,带有Kotlin流和缓冲读取器。SseHeartbeatData只是sse端点返回的自定义json负载。

import com.google.gson.Gson
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import okhttp3.ResponseBody
import retrofit2.http.GET
import retrofit2.http.Streaming

interface NetworkService {
  @Streaming
  @GET("/network/sse/heartbeat")
  suspend fun getHeartbeat() : ResponseBody
}

data class SseHeartbeatData(val count: Int, val timestamp: Long, val requestId: String)

class SseRepository(private val networkService: NetworkService) {

fun getHeartbeat() = flow {
    coroutineScope {
        val gson = Gson()
        val inputReader = networkService.getHeartbeat().byteStream().bufferedReader()

        var event: String? = null
        var heartbeatData: SseHeartbeatData? = null

        while (isActive) {
            val line = inputReader.readLine()

            when {
                line.startsWith("event:") -> {
                    event = line.removePrefix("event:").trim()
                }
                line.startsWith("data:") -> {
                    val jsonString = line.removePrefix("data:").trim()

                    if (event == "heartbeat") {
                        heartbeatData = gson.fromJson(jsonString, SseHeartbeatData::class.java)
                    }

                    event = null
                }
                line.isEmpty() -> {
                    heartbeatData?.let {
                        emit(it)
                    }
                    heartbeatData = null
                }
            }
        }
        inputReader.close()
    }
}

}
然后,您可以在视图模型中收集它,并根据需要处理异常。

_sseJob = viewModelScope.launch(Dispatchers.IO) {
        sseRepository.getHeartbeat().catch {
            Log.w("MainViewModel", it)
            _sseJob?.cancel()
            _uiState.value = SseNotActive
        }
            .collect {
            _uiState.value = SseActive(count = it.count)
        }
    }

相关问题