通过python连接到apache Kafka API

hrysbysz  于 2023-01-04  发布在  Python
关注(0)|答案(1)|浏览(148)

我一直在阅读apache kafka API文档from here
this is the code that i have made in python which is TCP connection to kafka broker sitting at 9092 with all the server.properties are default for kafka

import socket
import struct

# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Connect the socket to the port where the server is listening
server_address = ('localhost', 9092)
sock.connect(server_address)


#int16=2bytes=h
#int32=4bytes=i
#int64=8bytes=q

def string_to_binary(topic):

    topics=bytearray()
    topics+=struct.pack(">h",len(topic)) #INT16
    topics+=bytes(topic,"utf-8")
    return topics
request_api_key,request_api_version,correlation_id=1,0,7
headers=bytearray()
headers+=struct.pack(">hhi",request_api_key,request_api_version,correlation_id)

replica_id,max_wait_ms,min_bytes=0,10,100
body=bytearray()
body+=struct.pack(">iii",replica_id,max_wait_ms,min_bytes)


topics=bytearray()
topics+=string_to_binary("messages")# topic name
topics+=struct.pack(">iqi",1,0,10)#  partition array

request=headers+body+topics
print("data is sent out")
sock.sendall(request)
# Receive the response
response = bytearray()
while True:
    print("waiting for data")
    data = sock.recv(1024)
    print("intermediate data which is blocking")
    if not data:
        break
    response += data
print(response)

此代码用于获取partiton为0且offset为0的主题messages,输出为空,这是来自kafka broker日志9092

的屏幕截图
所以是的,org. apache. kafka. common. network.接收无效异常:接收无效(大小= 1919903841大于104857600),因此最大大小为104857600,我已将socket.request.max.bytes更改为2019903841
则这是新的错误

有没有人能解决这个问题?
先谢了!
我的主要目标是连接到apachekafkaAPI,我想从特定主题中获取特定偏移量的所有数据

a8jjtwal

a8jjtwal1#

从一个特定的主题中提取特定偏移量的所有数据
对于kafka-pythonKafkaConsumer示例,您已经可以通过assign-ing主题分区和seek-ing偏移量,然后poll-ing数据来实现这一点

相关问题