scrapy clusters kafka\u monitor.py中断

ovfsdjhp  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(386)

scrapy集群是一种扩展大型、连续scrapy项目的新方法。它是在一个vm中设置的,并与来自不同工具的多个示例一起运行。
我已经构建了虚拟机并安装了必要的工具,比如redis、kafka和zookeeper。
在第一次测试爬网之前,我只停留在一条简单的线路上,我不知道这是python问题、安装问题还是什么。
当我尝试的时候 python kafka_monitor.py run 我得到:
回溯(最近一次调用last):sys.exit(main())中的文件“kafka\u monitor.py”,第512行
文件“kafka\u monitor.py”,第497行,在kafka\u monitor.run()主目录中
文件“kafka\u monitor.py”,第413行,运行self.\u setup\u kafka()
文件“kafka\u monitor.py”,第232行,在\u setup\u kafka ret\u val=\u hidden\u setup()
文件“/usr/local/lib/python2.7/dist packages/scutils/method\u timer.py”,第46行,在f2 retval=f(*args)中
文件“kafka\u monitor.py”,第218行,在\u hidden\u setup self.kafka\u conn=kafkaclient(self.settings['kafka\u hosts'])中
typeerror:\uuuuu init\uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu
这个 settings.py 包括:

KAFKA_HOSTS = 'localhost:9092'
KAFKA_INCOMING_TOPIC = 'demo.incoming'
KAFKA_GROUP = 'demo-group'
KAFKA_FEED_TIMEOUT = 5
KAFKA_CONN_TIMEOUT = 5

按照快速启动的指示,我做了一个 localsettings.py


# Here, 'scdev' is the host with Kafka, Redis, and Zookeeper

REDIS_HOST = 'scdev'
KAFKA_HOSTS = 'scdev:9092'
ZOOKEEPER_HOSTS = 'scdev:2181'

即使我进入 kafka_monitor.py 并更改第218行(从上面的初始错误回溯)
发件人:

self.kafka_conn = KafkaClient(self.settings['KAFKA_HOSTS'])

收件人:

self.kafka_conn = KafkaClient(self.settings['scdev:9092'])

我收到了同样的错误。

pepwfjgg

pepwfjgg1#

成功!
@麦迪逊·巴赫默谢谢你把我引向正确的方向。我只想澄清一下我是如何让它运行的。
中实际代码的第一行(因此第3行) kafka_monitor.py 内容如下:

from kafka.client import KafkaClient

我把它改成:

from kafka import KafkaClient

这就成功了!
这是一个短视的解决方案吗?如果没有:

from kafka.client import KafkaClient

我不知道,但我会更新这个在未来,如果问题具体到这个arrese。

omhiaaxx

omhiaaxx2#

虽然我不确定您的环境是什么,但我强烈建议您遵循本文简介中的快速入门指南,该指南教您如何设置和测试repo的克隆,甚至还提供了一个虚拟机供您运行和测试所有内容。我似乎无法复制您的问题,但下面是我针对kafka python(支持)和pykafka(不支持)运行的一些测试。
在这里, scdev 只是一个运行Kafka的虚拟机。使用Python2.7.10。
kafka python 0.9.5(固定在requirements.txt中):

>>> from kafka import KafkaClient
>>> k = KafkaClient()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: __init__() takes at least 2 arguments (1 given)
>>> k = KafkaClient('scdev:9092')
>>> k.ensure_topic_exists('test')
>>>

请注意,您不能调用 KafkaClient() 没有参数的构造函数,所以我不知道它是怎么工作的。
pykafka 2.2.1(最新版本)

>>> from pykafka import KafkaClient
>>> k = KafkaClient()
>>> k.ensure_topic_exists()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'KafkaClient' object has no attribute 'ensure_topic_exists'
>>> k2 = KafkaClient('scdev:9092')
>>>

注意,现在可以了,但是我们使用的是完全不同的库,我得到了相同的属性错误。我不知道你是否修改了程序包导入,但我似乎在项目中找不到任何关于pykafka的引用。
kafka python 1.0.1(pypi的最新版本)

>>> from kafka import KafkaClient
>>> k = KafkaClient()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/madisonb/.local/share/virtualenvs/sc3/lib/python2.7/site-packages/kafka/__init__.py", line 41, in __init__
    super(KafkaClient, self).__init__(*args,**kwargs)
TypeError: __init__() takes at least 2 arguments (1 given)
>>> k = KafkaClient('scdev:9092')
>>> k.ensure_topic_exists('test')
>>>

只需三次检查它是否在最新的pypi包上工作。
同样重要的是要注意变量 settings 问题中引用的只是设置文件中元素的字典。这么叫 self.settings['scdev:9092'] 是没有意义的,因为那个密钥不存在。真正地, self.settings['KAFKA_HOSTS'] 只是一个字典查找,看起来是字符串 'scdev:9092' 编辑:
我可以复制 __init__() 在kafka python 1.0.1中使用kafka\u monitor.py提供的语法时出错。

>>> from kafka.client import KafkaClient # new syntax for importing KafkaClient
>>> k = KafkaClient()
>>> k.ensure_topic_exists('test')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'KafkaClient' object has no attribute 'ensure_topic_exists'
>>> k2 = KafkaClient('scdev:9092')
Exception AttributeError: "'KafkaClient' object has no attribute '_wake_r'" in <bound method KafkaClient.__del__ of <kafka.client_async.KafkaClient object at 0x102c51d50>> ignored
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: __init__() takes exactly 1 argument (2 given)
>>>

看起来您安装的版本与requirements.txt文件不匹配。这只会给你带来更多问题,我建议你 pip install -r requirements.txt 看看这能不能解决问题。

相关问题