apache kafka python-self.async

8xiog9wr  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(827)

我已经配置了heroku kafka版本3.8,并试图从下面的代码中创建producer。错误如下,我已经粘贴了我的requirement.txt和我的代码
请给我解决这个问题的办法。

File "kafka-producer.py", line 5, in <module>
import kafka_helper
File "/app/kafka_helper.py", line 19, in <module>
from kafka import KafkaProducer, KafkaConsumer
File "/usr/local/lib/python3.8/site-packages/kafka/__init__.py", line 22, in <module>
from kafka.producer import KafkaProducer
File "/usr/local/lib/python3.8/site-packages/kafka/producer/__init__.py", line 4, in <module>
from .simple import SimpleProducer
File "/usr/local/lib/python3.8/site-packages/kafka/producer/simple.py", line 54
return '<SimpleProducer batch=%s>' % self.async

需求.txt

Flask>=1.1.1
cffi==1.14.4
cryptography==3.2.1
enum34==1.1.10
ipaddress==1.0.23
kafka==1.3.5
kafka-helper==0.1
kafka-python==2.0.2
psycopg2==2.8.6
psycopg2-binary==2.8.6
pycparser==2.20
six==1.15.0
snowflake-connector-python==2.3.7

我的剧本

import snowflake.connector
import os
import json
import ssl
import kafka_helper
from kafka import KafkaProducer  # ,  KafkaConsumer
V_SSL_CONTEXT = kafka_helper.get_kafka_ssl_context().

# Create Producer Properties

def fn_kafka_producer(acks='all',
                      value_serializer=lambda v: json.dumps(v).encode('utf-8')):
    kafkaprod = KafkaProducer(
        bootstrap_servers=V_KAFKA_URL,
        # key_serializer=key_serializer,
        value_serializer=value_serializer,
        ssl_context=V_SSL_CONTEXT,
        acks=acks
    )
    return kafkaprod

if __name__ == '__main__':
    # Create the Producer
    PRODUCER = fn_kafka_producer()

    # Create a producer Record
    PRODUCER.send(KAFKA_TOPIC, 'Hello Heroku!!')
    PRODUCER.flush()

我的Kafka助手.py

""

用于创建kafka python kafkaproducer和kafkaconsumer对象的助手方法

import os
import json
import ssl
from tempfile import NamedTemporaryFile
try:
    from urllib.parse import urlparse
except ImportError:
    from urlparse import urlparse

from base64 import standard_b64encode

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

from kafka import KafkaProducer, KafkaConsumer

def get_kafka_ssl_context():
    """
    Returns an SSL context based on the certificate information in the Kafka config vars.
    """
    # NOTE: We assume that Kafka environment variables are present. If using
    # Apache Kafka on Heroku, they will be available in your app configuration.
    #
    # 1. Write the PEM certificates necessary for connecting to the Kafka brokers to physical
    # files.  The broker connection SSL certs are passed in environment/config variables and
    # the python and ssl libraries require them in physical files.  The public keys are written
    # to short lived NamedTemporaryFile files; the client key is encrypted before writing to
    # the short lived NamedTemporaryFile
    #
    # 2. Create and return an SSLContext for connecting to the Kafka brokers referencing the
    # PEM certificates written above
    #

    # stash the kafka certs in named temporary files for loading into SSLContext.  Initialize the
    # SSLContext inside the with so when it goes out of scope the files are removed which has them
    # existing for the shortest amount of time.  As extra caution password
    # protect/encrypt the client key

    with NamedTemporaryFile(suffix='.crt') as cert_file, \
         NamedTemporaryFile(suffix='.key') as key_file, \
         NamedTemporaryFile(suffix='.crt') as trust_file:
        cert_file.write(os.environ['KAFKA_CLIENT_CERT'].encode('utf-8'))
        cert_file.flush()

        # setup cryptography to password encrypt/protect the client key so it's not in the clear on
        # the filesystem.  Use the generated password in the call to load_cert_chain
        passwd = standard_b64encode(os.urandom(33))
        private_key = serialization.load_pem_private_key(
            os.environ['KAFKA_CLIENT_CERT_KEY'].encode('utf-8'),
            password=None,
            backend=default_backend()
        )
        pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.BestAvailableEncryption(passwd)
        )
        key_file.write(pem)
        key_file.flush()

        trust_file.write(os.environ['KAFKA_TRUSTED_CERT'].encode('utf-8'))
        trust_file.flush()

        # create an SSLContext for passing into the kafka provider using the create_default_context
        # function which creates an SSLContext with protocol set to PROTOCOL_SSLv23, OP_NO_SSLv2,
        # and OP_NO_SSLv3 when purpose=SERVER_AUTH.
        ssl_context = ssl.create_default_context(
            purpose=ssl.Purpose.SERVER_AUTH, cafile=trust_file.name)
        ssl_context.load_cert_chain(cert_file.name, keyfile=key_file.name, password=passwd)

        # Intentionally disabling hostname checking.  The Kafka cluster runs in the cloud and Apache
        # Kafka on Heroku doesn't currently provide stable hostnames.  We're pinned to a specific certificate
        # for this connection even though the certificate doesn't include host information.  We rely
        # on the ca trust_cert for this purpose.
        ssl_context.check_hostname = False

    return ssl_context

def get_kafka_brokers():
    """
    Parses the KAKFA_URL and returns a list of hostname:port pairs in the format
    that kafka-python expects.
    """
    # NOTE: The Kafka environment variables need to be present. If using
    # Apache Kafka on Heroku, they will be available in your app configuration.
    if not os.environ.get('KAFKA_URL'):
        raise RuntimeError('The KAFKA_URL config variable is not set.')

    return ['{}:{}'.format(parsedUrl.hostname, parsedUrl.port) for parsedUrl in
            [urlparse(url) for url in os.environ.get('KAFKA_URL').split(',')]]

def get_kafka_producer(acks='all',
                       value_serializer=lambda v: json.dumps(v).encode('utf-8')):
    """
    Return a KafkaProducer that uses the SSLContext created with create_ssl_context.
    """

    producer = KafkaProducer(
        bootstrap_servers=get_kafka_brokers(),
        security_protocol='SSL',
        ssl_context=get_kafka_ssl_context(),
        value_serializer=value_serializer,
        acks=acks
    )

    return producer

def get_kafka_consumer(topic=None,
                       value_deserializer=lambda v: json.loads(v.decode('utf-8'))):
    """
    Return a KafkaConsumer that uses the SSLContext created with create_ssl_context.
    """

    # Create the KafkaConsumer connected to the specified brokers. Use the
    # SSLContext that is created with create_ssl_context.
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=get_kafka_brokers(),
        security_protocol='SSL',
        ssl_context=get_kafka_ssl_context(),
        value_deserializer=value_deserializer
    )

    return consumer

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题