我一直在阅读apache kafka API文档from here
this is the code that i have made in python which is TCP connection to kafka broker sitting at 9092 with all the server.properties are default for kafka
import socket
import struct
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ('localhost', 9092)
sock.connect(server_address)
#int16=2bytes=h
#int32=4bytes=i
#int64=8bytes=q
def string_to_binary(topic):
topics=bytearray()
topics+=struct.pack(">h",len(topic)) #INT16
topics+=bytes(topic,"utf-8")
return topics
request_api_key,request_api_version,correlation_id=1,0,7
headers=bytearray()
headers+=struct.pack(">hhi",request_api_key,request_api_version,correlation_id)
replica_id,max_wait_ms,min_bytes=0,10,100
body=bytearray()
body+=struct.pack(">iii",replica_id,max_wait_ms,min_bytes)
topics=bytearray()
topics+=string_to_binary("messages")# topic name
topics+=struct.pack(">iqi",1,0,10)# partition array
request=headers+body+topics
print("data is sent out")
sock.sendall(request)
# Receive the response
response = bytearray()
while True:
print("waiting for data")
data = sock.recv(1024)
print("intermediate data which is blocking")
if not data:
break
response += data
print(response)
此代码用于获取partiton为0且offset为0的主题messages
,输出为空,这是来自kafka broker日志9092
的屏幕截图
所以是的,org. apache. kafka. common. network.接收无效异常:接收无效(大小= 1919903841大于104857600),因此最大大小为104857600,我已将socket.request.max.bytes
更改为2019903841
则这是新的错误
有没有人能解决这个问题?
先谢了!
我的主要目标是连接到apachekafkaAPI,我想从特定主题中获取特定偏移量的所有数据
1条答案
按热度按时间a8jjtwal1#
从一个特定的主题中提取特定偏移量的所有数据
对于
kafka-python
KafkaConsumer
示例,您已经可以通过assign
-ing主题分区和seek
-ing偏移量,然后poll
-ing数据来实现这一点