我正在测试一个使用 bulk
索引一些文档。
这是我的密码:
import mock
import unittest
import json
from elasticsearch.helpers import bulk
from ingestion.ingestor import Ingestor
class TestIngestor(unittest.TestCase):
def setUp(self):
self.ingestor = Ingestor()
@mock.patch("elasticsearch.helpers.bulk", mock.MagicMock(return_value=True))
def test_ingestor(self):
with open("tests/data/sample_payload.json", "r") as reader:
sample_messages = json.loads(reader.read())["Messages"]
actions = self.ingestor.ingest(sample_messages)
self.assertEqual(len(actions), 10)
然而,嘲笑似乎不起作用。。。当我运行它时,我会得到一个很长的连接拒绝错误列表。
我该怎么修?
1条答案
按热度按时间6za6bjd01#
结果发现我的补丁错了。。。下面是我如何修复它的: