AsyncElasticsearch客户端不接受与AWS Elasticsearch的连接- AttributeError:“AWS 4Auth”对象没有属性“encode”

dba5bblo  于 2023-10-17  发布在  ElasticSearch
关注(0)|答案(3)|浏览(118)

我在我的项目中使用AWS Elasticsearch和AWS Elasticsearch-py包来连接集群。

AWS Elasticsearch版本7.9
Python包elasticsearch[async]==7.12.0

我无法使用AWS4Auth库(在官方AWS ES客户端Python文档中提到)初始化Async Elasticsearch客户端。
它应该成功地连接到客户端。但是,它给了我这个错误:

AttributeError: 'AWS4Auth' object has no attribute 'encode'

分享我的代码片段:

from elasticsearch import AsyncElasticsearch, AIOHttpConnection
from requests_aws4auth import AWS4Auth
import asyncio

host = 'my-test-domain.us-east-1.es.amazonaws.com'
region = 'us-east-1'
service = 'es'

credentials = {
    'access_key': "MY_ACCESS_KEY",
    'secret_key': "MY_SECRET_KEY"
}

awsauth = AWS4Auth(credentials['access_key'], credentials['secret_key'], region, service)

es = AsyncElasticsearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=AIOHttpConnection
)

async def test():
    print(await es.info())

asyncio.run(test())
fkaflof6

fkaflof61#

class AWSAuthAIOHttpConnection(AIOHttpConnection):
"""Enable AWS Auth with AIOHttpConnection for AsyncElasticsearch

The AIOHttpConnection class built into elasticsearch-py is not currently
compatible with passing AWSAuth as the `http_auth` parameter, as suggested
in the docs when using AWSAuth for the non-async RequestsHttpConnection class:
https://docs.aws.amazon.com/opensearch-service/latest/developerguide/request-signing.html#request-signing-python

To work around this we patch `AIOHttpConnection.perform_request` method to add in
AWS Auth headers before making each request.

This approach was synthesized from
* https://stackoverflow.com/questions/38144273/making-a-signed-http-request-to-aws-elasticsearch-in-python
* https://github.com/DavidMuller/aws-requests-auth
* https://github.com/jmenga/requests-aws-sign
* https://github.com/byrro/aws-lambda-signed-aiohttp-requests
"""

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._credentials = boto3.Session().get_credentials()

async def perform_request(
    self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
):
    def _make_full_url(url: str) -> str:
        # These steps are copied from the parent class' `perform_request` implementation.
        # The ElasticSearch client only passes in the request path as the url,
        # and that partial url format is rejected by the `SigV4Auth` implementation
        if params:
            query_string = urlencode(params)
        else:
            query_string = ""

        full_url = self.host + url
        full_url = self.url_prefix + full_url
        if query_string:
            full_url = "%s?%s" % (full_url, query_string)

        return full_url

    full_url = _make_full_url(url)
    if headers is None:
        headers = {}

    # this request object won't be used, we just want to copy its auth headers
    # after `SigV4Auth` processes it and adds the headers
    _request = AWSRequest(
        method=method, url=full_url, headers=headers, params=params, data=body
    )
    SigV4Auth(self._credentials, "es", "us-west-1").add_auth(_request)
    headers.update(_request.headers.items())

    # passing in the original `url` param here works too
    return await super().perform_request(
        method, full_url, params, body, timeout, ignore, headers
    )
anhgbhbe

anhgbhbe2#

我把@francojposa的答案上面和固定/适应它,我试图提交一个编辑他的答案,但“建议队列已满”或这样的
requirements.txt

boto3<2.0
elasticsearch[async]<7.14  # in 7.14 they "shut-out" anything other than elastic cloud

这是主要的定义

from urllib.parse import urlencode

import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from elasticsearch import AsyncElasticsearch, AIOHttpConnection

class AWSAuthAIOHttpConnection(AIOHttpConnection):
    """Enable AWS Auth with AIOHttpConnection for AsyncElasticsearch

    The AIOHttpConnection class built into elasticsearch-py is not currently
    compatible with passing AWSAuth as the `http_auth` parameter, as suggested
    in the docs when using AWSAuth for the non-async RequestsHttpConnection class:
    https://docs.aws.amazon.com/opensearch-service/latest/developerguide/request-signing.html#request-signing-python

    To work around this we patch `AIOHttpConnection.perform_request` method to add in
    AWS Auth headers before making each request.

    This approach was synthesized from
    * https://stackoverflow.com/questions/38144273/making-a-signed-http-request-to-aws-elasticsearch-in-python
    * https://github.com/DavidMuller/aws-requests-auth
    * https://github.com/jmenga/requests-aws-sign
    * https://github.com/byrro/aws-lambda-signed-aiohttp-requests
    """

    def __init__(self, *args, aws_region=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.aws_region = aws_region
        self._credentials = boto3.Session().get_credentials()
        self.auther = SigV4Auth(self._credentials, "es", self.aws_region)

    def _make_full_url(self, url: str, params=None) -> str:
        # These steps are copied from the parent class' `perform_request` implementation.
        # The ElasticSearch client only passes in the request path as the url,
        # and that partial url format is rejected by the `SigV4Auth` implementation
        query_string = urlencode(params) if params else None

        full_url = self.url_prefix + self.host + url
        if query_string:
            full_url = "%s?%s" % (full_url, query_string)

        return full_url

    async def perform_request(
        self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
    ):
        full_url = self._make_full_url(url)
        if headers is None:
            headers = {}

        # this request object won't be used, we just want to copy its auth headers
        # after `SigV4Auth` processes it and adds the headers
        _request = AWSRequest(
            method=method, url=full_url, headers=headers, params=params, data=body
        )

        self.auther.add_auth(_request)
        headers.update(_request.headers.items())

        # passing in the original `url` param here works too
        return await super().perform_request(
            method, url, params, body, timeout, ignore, headers
        )

使用方法:

es_client = AsyncElasticsearch(
    ['https://aws-es-or-opensearch-url-goes-here'],
    use_ssl=True, verify_certs=True,
    connection_class=AWSAuthAIOHttpConnection, aws_region='us-east-1'
)

async def test():
    body = {...}
    results = await es_client.search(body=body, index='test', doc_type='test')  # I use ES 5/6
rqqzpn5f

rqqzpn5f3#

我认为使用AWS4Auth,您将绑定到RequestsHttpConnection。
默认的连接类基于urllib3,它比可选的基于请求的类更高效和轻量级。只有当你需要任何请求高级功能,如自定义认证插件等时,才使用RequestsHttpConnection。
https://elasticsearch-py.readthedocs.io/en/master/transports.html
试试看:

es = AsyncElasticsearch(
     hosts=[{'host': host, 'port': 443}],
     http_auth=awsauth,
     use_ssl=True,
     verify_certs=True,
     connection_class=RequestsHttpConnection
)

如果上面的代码不起作用,则使用非JavaScript版本:

es = Elasticsearch(
     hosts=[{'host': host, 'port': 443}],
     http_auth=awsauth,
     use_ssl=True,
     verify_certs=True,
     connection_class=RequestsHttpConnection
)

相关问题