我有以下设置-zookeeper、kafka、schema registry和kafka connect。它们都在Kubernetes的不同舱中运行。我想把一些json放在kafka connect和spooldir的一个文件夹中,将它们发送到特定的kafka主题。我想用它们作为我们应用程序的冷启动。这是Kafka连接的kubernetes配置
kind: StatefulSet
metadata:
name: kafka-connect
labels:
app.kubernetes.io/name: kafka-connect
spec:
selector:
matchLabels:
app.kubernetes.io/name: kafka-connect
serviceName: kafka-connect-headless
podManagementPolicy: OrderedReady
updateStrategy:
type: OnDelete
replicas: 1
template:
metadata:
labels:
helm.sh/chart: kafka-0.21.0
app.kubernetes.io/name: kafka-connect
spec:
terminationGracePeriodSeconds: 10
volumes:
- hostPath:
path: /dependency/
type: Directory
name: dependency-data
initContainers:
- name: wait-for-schema-registry
image: alpine:3.12.0
command: ['sh', '-c', "until nslookup schema-registry-svc.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for schema registry; sleep 2; done"]
containers:
- name: kafka-connect
image: "confluentinc/cp-kafka-connect:5.5.0"
imagePullPolicy: "IfNotPresent"
volumeMounts:
- mountPath: /dependency/
name: dependency-data
ports:
- containerPort: 8083
name: kafka-connect
protocol: TCP
command:
- sh
- -exc
- |
mkdir -p /dependency/unprocessed && \
mkdir -p /dependency/json/processed && mkdir -p /dependency/json/error && \
confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43 && \
confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.0 && \
exec /etc/confluent/docker/run && \
sleep infinity
env:
- name: CONNECT_REST_ADVERTISED_HOST_NAME
value: "kafka-connect"
- name: KAFKA_HEAP_OPTS
value: "-Xms2048M -Xmx2048M"
- name: CONNECT_BOOTSTRAP_SERVERS
value: "kafka:9092"
- name: CONNECT_GROUP_ID
value: "kafka-connect"
- name: CONNECT_PLUGIN_PATH
value: "/usr/share/java,/usr/share/confluent-hub-components/"
- name: CONNECT_KEY_CONVERTER
value: "org.apache.kafka.connect.storage.StringConverter"
- name: CONNECT_VALUE_CONVERTER
value: "io.confluent.connect.avro.AvroConverter"
- name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: "http://schema-registry-svc:8081"
- name: CONNECT_INTERNAL_KEY_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_INTERNAL_VALUE_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: "kafka-connect-config"
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: "kafka-connect-offset"
- name: CONNECT_STATUS_STORAGE_TOPIC
value: "kafka-connect-status"
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_LOG4J_ROOT_LOGLEVEL
value: "DEBUG"
- name: CONNECT_CONNECT_PROTOCOL
value: "compatible"
- name: CONNECT_TOPIC_CREATION_ENABLE
value: "true"
我发送的spooldir配置:
curl -i -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
"name": "kafka-connect-spooldir",
"config": {
"connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
"tasks.max":"1",
"input.path":"/dependency/unprocessed/",
"input.file.pattern":"^.*\\.json$",
"error.path":"/dependency/json/error",
"finished.path":"/dependency/json/processed",
"auto.register.schemas": "false",
"schema.generation.enabled": "true",
"topic":"checklist",
"value.converter.enhanced.avro.schema.support":"true",
"topic.creation.enable": "true"
}
}'
kafka connect日志中没有异常,它将文件从未处理的文件夹移动到已处理的文件夹,但是当我尝试使用kafka console consumer读取主题时,它说它读取了2条消息,但根本没有显示任何消息(只有2个空行和挂起)。然后我尝试在sr中注册checklist schema。我再次尝试处理这些文件-相同的reuslt,这次它说kafka控制台消费者消费了4条消息,但什么也没有显示。我尝试使用kafka avro控制台消费-这一次消费了0条消息。我甚至注意到Kafka连接注册了一个非常“假”的模式我的对象- {"subject":"checklist-value","version":5,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"com.github.jcustenborder.kafka.connect.model\",\"fields\":[{\"name\":\"Checklist\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"com.github.jcustenborder.kafka.connect.model.Value\"}"}
我也检查了Kafka和sr的日志,但没有注意到有什么不对劲的地方。也许不需要,但我也试着用 "schema.generation.enabled": "false"
,然后在日志中设置java.lang.nullpointerexception,如果“schema.generation.enabled”为false,则必须设置“value.schema”。然后尝试用kafka connect avro converter将我现有的.avsc转换为kafka模式,但结果发现他们没有正确的方法打印生成的模式,所以我猜这是目的,它与我的期望不一样。
我在kafka connect的配置中缺少了什么,可以用sr开始序列化并且不在主题中发送空消息?
暂无答案!
目前还没有任何答案,快来回答吧!