使用dataloader接口kafka数据

b1uwtaje  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(348)

我使用dataloader来推断kafka中的数据,但它不起作用
这是我的密码

class kfkdataset(Dataset):
def __init__(self,consumer,image_size):
    super(kfkdataset).__init__()
    self.image_size=image_size
    self.consumer = consumer
def __getitem__(self, index):
    info = json.loads(next(self.consumer).value)
    image_osspath = info['path']
    image = prep_image_batch(image_osspath,self.image_size)
    return image,image_osspath

def __len__(self):
    # You should change 0 to the total size of your dataset.
    return 9000000

consumer = KafkaConsumer('my-topic',bootstrap_servers=[])

prodataset = kfkdataset(consumer,image_size=608)#)
k = DataLoader(prodataset,
        batch_size=batch_size,
        num_workers=16)
for inputimage,osspath in k:

    inputimage = inputimage.to(device)
    detections,_ = model(inputimage)
detections = non_max_suppression(detections, 0.98, 0.4)

当num\u workers为1时,它起作用
当num\u workers>1时:出现错误
文件“batch\u upload.py”,第80行,in for inputimage,osspath in k:file“/usr/local/lib/python3.6/dist packages/torch/utils/data/dataloader.py”,第801行,in\u next\u return self.\u process\u data(data)file“/usr/local/lib/python3.6/dist packages/torch/utils/data/dataloader.py”,第846行,在\u process \u data data.reraise()file“/usr/local/lib/python3.6/dist packages/torch/\u utils.py”中,第369行,在reraise raise self.exc\u type(msg)fileexistserror:dataloader worker进程1中捕获的fileexistserror。原始回溯(最后一次调用):file“/usr/local/lib/python3.6/dist-packages/torch/utils/data/\u-utils/worker.py”,第178行,\u-worker\u-loop data=fetcher.fetch(index)file“/usr/local/lib/python3.6/dist-packages/torch/utils/data/\u-utils/fetch.py”,第44行,在fetch data=[self.dataset[idx]for idx in mably \u batched \u index]文件“/usr/local/lib/python3.6/dist packages/torch/utils/data/\u utils/fetch.py”第44行,在data=[self.dataset[idx]for idx in mably \u batched \u index]文件“/appbatch/utils/utils.py”第49行,在getitem info=json.loads(next(self.consumer.value)file“/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py”第1192行,在next return self.next v2()file“/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py”第1200行,在next\u v2 return next(self.\u iterator)file“/usr/local/lib/python3.6/dist packages/kafka/consumer/group.py”第1115行,在\u message\u generator\u v2 record\u map=self.poll(timeout\u ms=timeout\u ms,update\u offsets=false)file“/usr/local/lib/python3.6/dist packafka/consumer/group.py”第654行,在poll records=self.\u poll\u once(剩余,max_records,update_offsets=update_offsets)file“/usr/local/lib/python3.6/dist packages/kafka/consumer/group.py”,第701行,在“poll\u once self.\u client.poll(timeout_ms=timeout_ms)file“/usr/local/lib/python3.6/dist packages/kafka/client_async.py”第600行,在poll self.\u poll(timeout/1000)file“/usr/local/lib/python3.6/dist packages/kafka/client\u async.py”第629行,在\u poll self.\u register\u send\u sockets()file“/usr/local/lib/python3.6/dist packages/kafka/client\u async.py”第619行,在\u register\u send\u sockets self.\u selector.modify(key.fileobj,events,key.data)文件“/usr/lib/python3.6/selectors.py”,第261行,在modify key=self.register(fileobj,events,data)文件“/usr/lib/python3.6/selectors.py”,第412行,在register self.\u epoll.register(key.fd,epoll\u events)文件existserror:[errno 17]文件存在
我想知道怎么做

c86crjj0

c86crjj01#

基本上,设置 num_workers > 1 在pytorch中,dataloader创建了几个工作进程,这些工作进程反过来绑定到同一个套接字端口,因为只有一个使用者。
并行化和改进从kafka导入数据的一种方法是在同一个消费者组中为该主题创建多个消费者。

相关问题