使用kafka作为消息队列

ohfgkhjo  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(299)

我正在尝试将kafka用作控制器中的消息队列(so任务):

@RequestMapping("users")
@RestController
public class UserController extends BaseController {

    private static final String KAFKA_TOPIC = "virto_users";

    @Autowired
    private KafkaTemplate<String, String> mKafkaTemplate;

    @PutMapping(value = "{id}")
    public ResponseEntity<String> put(@PathVariable UUID id,
                                      @RequestBody String profile) {
        String url = ServerConfig.USERS_HOST + "/users/" + id;
        ResponseEntity<String> entity = request.put(url, profile);
        HttpStatus status = entity.getStatusCode();

        if (status == HttpStatus.CREATED || status == HttpStatus.NO_CONTENT) {
            return entity;
        }

        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("profile", profile);
        sendMessage(json.toString());

        return new ResponseEntity<>(HttpStatus.NO_CONTENT);
    }

    public void sendMessage(String msg) {
        mKafkaTemplate.send(KAFKA_TOPIC, msg);
    }

    @KafkaListener(topics = KAFKA_TOPIC, groupId = KafkaConfig.GROUP_ID)
    public void listen(String message) {
        JSONObject json = new JSONObject(message);
        UUID id = UUID.fromString(json.getString("id"));
        String profile = json.getString("profile");
        put(id, profile);
    }
}

如果我的内部服务可用并返回已创建或无内容,则一切正常,我可以返回结果,否则我需要返回无内容状态,并将此消息放入我的队列中重试,直到它被处理为止。我像上面那样做的,但这看起来不是个好办法。我只是想问一些建议,我如何才能改善这个解决方案,或它是正常的。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题