据我们所知 Topic kafka中的创建应该在服务器初始化部分处理。在这里我们使用默认脚本 ./kafka-topics --zookeeper ... ,但是如果我们需要动态地创建一个主题呢?
Topic
./kafka-topics --zookeeper ...
fnx2tebb1#
延伸安德烈·内恰耶夫的答案在10.2.0中,获取createtopicsrequest示例的方式发生了一些变化。我们需要使用生成器内部类来构建createtopicsrequest示例。下面是一个代码示例。
CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false); CreateTopicsRequest request = builder.build();
oprakyz72#
幸运的是, Kafka 0.10.1.0 给我们带来了这种能力。我在confluence jira板上看到了这些引人入胜的特性,但是找不到任何与这个主题相关的文档,讽刺,不是吗?所以,我找到了源代码,找到了动态创建主题的方法。希望它能对你们中的一些人有所帮助。当然,如果您有更好的解决方案,请随时与我们分享。好,我们开始吧。
Kafka 0.10.1.0
/**The method propagate topics**/ public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException { CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication); Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream() .collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1 CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2 try { CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3 return response.errors().entrySet().stream() .filter(error -> error.getValue() == Errors.NONE) .map(Map.Entry::getKey) .collect(Collectors.toList()); // 4 } catch (IOException e) { log.error(e); } return null; } ``` `1` 我们需要一个 `TopicDetails` ,为简单起见,我将在所有主题中共享相同的配置。假设 `mTopics` 是要创建的所有主题的字符串列表。 `2` 基本上我们想发送一个请求到我们的kafka集群,现在我们有了一个特殊的类,它接受 `CreateTopicsRequest` 和超时 `3` 我们需要发送请求并得到 `CreateTopicsResponse` ``` private static final short apiKey = ApiKeys.CREATE_TOPICS.id; private static final short version = 0; private static final short correlationId = -1; private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException { String[] comp = client.split(":"); if (comp.length != 2) { throw new IllegalArgumentException("Wrong client directive"); } String address = comp[0]; int port = Integer.parseInt(comp[1]); RequestHeader header = new RequestHeader(apiKey, version, client, correlationId); ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf()); header.writeTo(buffer); request.writeTo(buffer); byte byteBuf[] = buffer.array(); byte[] resp = requestAndReceive(byteBuf, address, port); ByteBuffer respBuffer = ByteBuffer.wrap(resp); ResponseHeader.parse(respBuffer); return CreateTopicsResponse.parse(respBuffer); } private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException { try(Socket socket = new Socket(address, port); DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); DataInputStream dis = new DataInputStream(socket.getInputStream()) ) { dos.writeInt(buffer.length); dos.write(buffer); dos.flush(); byte resp[] = new byte[dis.readInt()]; dis.readFully(resp); return resp; } catch (IOException e) { log.error(e); } return new byte[0]; }
这里没有什么魔力,只是发送请求,而不是解析字节流到响应。4 CreateTopicsResponse 有财产 errors ,这只是 Map<String, Errors> 哪里 key 是您请求的主题名称。棘手的是,它包含了您请求的所有主题,但是没有错误的主题是有价值的 Errors.None ,这就是我筛选响应并返回仅成功创建的主题的原因。
CreateTopicsResponse
errors
Map<String, Errors>
key
Errors.None
2条答案
按热度按时间fnx2tebb1#
延伸安德烈·内恰耶夫的答案
在10.2.0中,获取createtopicsrequest示例的方式发生了一些变化。我们需要使用生成器内部类来构建createtopicsrequest示例。下面是一个代码示例。
oprakyz72#
幸运的是,
Kafka 0.10.1.0
给我们带来了这种能力。我在confluence jira板上看到了这些引人入胜的特性,但是找不到任何与这个主题相关的文档,讽刺,不是吗?所以,我找到了源代码,找到了动态创建主题的方法。希望它能对你们中的一些人有所帮助。当然,如果您有更好的解决方案,请随时与我们分享。
好,我们开始吧。
这里没有什么魔力,只是发送请求,而不是解析字节流到响应。
4
CreateTopicsResponse
有财产errors
,这只是Map<String, Errors>
哪里key
是您请求的主题名称。棘手的是,它包含了您请求的所有主题,但是没有错误的主题是有价值的Errors.None
,这就是我筛选响应并返回仅成功创建的主题的原因。