我已经配置了heroku kafka版本3.8,并试图从下面的代码中创建producer。错误如下,我已经粘贴了我的requirement.txt和我的代码
请给我解决这个问题的办法。
File "kafka-producer.py", line 5, in <module>
import kafka_helper
File "/app/kafka_helper.py", line 19, in <module>
from kafka import KafkaProducer, KafkaConsumer
File "/usr/local/lib/python3.8/site-packages/kafka/__init__.py", line 22, in <module>
from kafka.producer import KafkaProducer
File "/usr/local/lib/python3.8/site-packages/kafka/producer/__init__.py", line 4, in <module>
from .simple import SimpleProducer
File "/usr/local/lib/python3.8/site-packages/kafka/producer/simple.py", line 54
return '<SimpleProducer batch=%s>' % self.async
需求.txt
Flask>=1.1.1
cffi==1.14.4
cryptography==3.2.1
enum34==1.1.10
ipaddress==1.0.23
kafka==1.3.5
kafka-helper==0.1
kafka-python==2.0.2
psycopg2==2.8.6
psycopg2-binary==2.8.6
pycparser==2.20
six==1.15.0
snowflake-connector-python==2.3.7
我的剧本
import snowflake.connector
import os
import json
import ssl
import kafka_helper
from kafka import KafkaProducer # , KafkaConsumer
V_SSL_CONTEXT = kafka_helper.get_kafka_ssl_context().
# Create Producer Properties
def fn_kafka_producer(acks='all',
value_serializer=lambda v: json.dumps(v).encode('utf-8')):
kafkaprod = KafkaProducer(
bootstrap_servers=V_KAFKA_URL,
# key_serializer=key_serializer,
value_serializer=value_serializer,
ssl_context=V_SSL_CONTEXT,
acks=acks
)
return kafkaprod
if __name__ == '__main__':
# Create the Producer
PRODUCER = fn_kafka_producer()
# Create a producer Record
PRODUCER.send(KAFKA_TOPIC, 'Hello Heroku!!')
PRODUCER.flush()
我的Kafka助手.py
""
用于创建kafka python kafkaproducer和kafkaconsumer对象的助手方法
import os
import json
import ssl
from tempfile import NamedTemporaryFile
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from base64 import standard_b64encode
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from kafka import KafkaProducer, KafkaConsumer
def get_kafka_ssl_context():
"""
Returns an SSL context based on the certificate information in the Kafka config vars.
"""
# NOTE: We assume that Kafka environment variables are present. If using
# Apache Kafka on Heroku, they will be available in your app configuration.
#
# 1. Write the PEM certificates necessary for connecting to the Kafka brokers to physical
# files. The broker connection SSL certs are passed in environment/config variables and
# the python and ssl libraries require them in physical files. The public keys are written
# to short lived NamedTemporaryFile files; the client key is encrypted before writing to
# the short lived NamedTemporaryFile
#
# 2. Create and return an SSLContext for connecting to the Kafka brokers referencing the
# PEM certificates written above
#
# stash the kafka certs in named temporary files for loading into SSLContext. Initialize the
# SSLContext inside the with so when it goes out of scope the files are removed which has them
# existing for the shortest amount of time. As extra caution password
# protect/encrypt the client key
with NamedTemporaryFile(suffix='.crt') as cert_file, \
NamedTemporaryFile(suffix='.key') as key_file, \
NamedTemporaryFile(suffix='.crt') as trust_file:
cert_file.write(os.environ['KAFKA_CLIENT_CERT'].encode('utf-8'))
cert_file.flush()
# setup cryptography to password encrypt/protect the client key so it's not in the clear on
# the filesystem. Use the generated password in the call to load_cert_chain
passwd = standard_b64encode(os.urandom(33))
private_key = serialization.load_pem_private_key(
os.environ['KAFKA_CLIENT_CERT_KEY'].encode('utf-8'),
password=None,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(passwd)
)
key_file.write(pem)
key_file.flush()
trust_file.write(os.environ['KAFKA_TRUSTED_CERT'].encode('utf-8'))
trust_file.flush()
# create an SSLContext for passing into the kafka provider using the create_default_context
# function which creates an SSLContext with protocol set to PROTOCOL_SSLv23, OP_NO_SSLv2,
# and OP_NO_SSLv3 when purpose=SERVER_AUTH.
ssl_context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=trust_file.name)
ssl_context.load_cert_chain(cert_file.name, keyfile=key_file.name, password=passwd)
# Intentionally disabling hostname checking. The Kafka cluster runs in the cloud and Apache
# Kafka on Heroku doesn't currently provide stable hostnames. We're pinned to a specific certificate
# for this connection even though the certificate doesn't include host information. We rely
# on the ca trust_cert for this purpose.
ssl_context.check_hostname = False
return ssl_context
def get_kafka_brokers():
"""
Parses the KAKFA_URL and returns a list of hostname:port pairs in the format
that kafka-python expects.
"""
# NOTE: The Kafka environment variables need to be present. If using
# Apache Kafka on Heroku, they will be available in your app configuration.
if not os.environ.get('KAFKA_URL'):
raise RuntimeError('The KAFKA_URL config variable is not set.')
return ['{}:{}'.format(parsedUrl.hostname, parsedUrl.port) for parsedUrl in
[urlparse(url) for url in os.environ.get('KAFKA_URL').split(',')]]
def get_kafka_producer(acks='all',
value_serializer=lambda v: json.dumps(v).encode('utf-8')):
"""
Return a KafkaProducer that uses the SSLContext created with create_ssl_context.
"""
producer = KafkaProducer(
bootstrap_servers=get_kafka_brokers(),
security_protocol='SSL',
ssl_context=get_kafka_ssl_context(),
value_serializer=value_serializer,
acks=acks
)
return producer
def get_kafka_consumer(topic=None,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))):
"""
Return a KafkaConsumer that uses the SSLContext created with create_ssl_context.
"""
# Create the KafkaConsumer connected to the specified brokers. Use the
# SSLContext that is created with create_ssl_context.
consumer = KafkaConsumer(
topic,
bootstrap_servers=get_kafka_brokers(),
security_protocol='SSL',
ssl_context=get_kafka_ssl_context(),
value_deserializer=value_deserializer
)
return consumer
暂无答案!
目前还没有任何答案,快来回答吧!