我想使用此repo将testdata.csv上传到Kafka主题。现在curl命令无法创建自定义连接器curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config -d @sample.json
这是我的docker compose
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.2.0
container_name: broker
depends_on:
- zookeeper
ports:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 9092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use broker:29092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- broker
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.2.0
container_name: kafka-connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- $PWD/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:2.0.0
confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.60
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
ksqldb:
# *-----------------------------*
# To connect to ksqlDB CLI
# docker exec --interactive --tty ksqldb ksql http://localhost:8088
# *-----------------------------*
image: confluentinc/ksqldb-server:0.21.0
container_name: ksqldb
depends_on:
- broker
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807
KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
control-center:
image: confluentinc/cp-enterprise-control-center:6.2.0
container_name: control-center
depends_on:
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT_CLUSTER: 'kafka-connect:8083'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_KSQL_KSQLDB_URL: "http://ksqldb:8088"
# The advertised URL needs to be the URL on which the browser
# can access the KSQL server (e.g. http://localhost:8088/info)
CONTROL_CENTER_KSQL_KSQLDB_ADVERTISED_URL: "http://localhost:8088"
# -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
# Useful settings for development/laptop use - modify as needed for Prod
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 104857600
command:
- bash
- -c
- |
echo "Waiting two minutes for Kafka brokers to start and
necessary topics to be available"
sleep 120
/etc/confluent/docker/run
# Other systems
mysql:
# *-----------------------------*
# To connect to the DB:
# docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
# *-----------------------------*
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
volumes:
- ${PWD}/data/mysql:/docker-entrypoint-initdb.d
- ${PWD}/data:/data
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1
container_name: elasticsearch
hostname: elasticsearch
ports:
- 9200:9200
environment:
xpack.security.enabled: "false"
ES_JAVA_OPTS: "-Xms1g -Xmx1g"
discovery.type: "single-node"
kibana:
image: docker.elastic.co/kibana/kibana:7.10.1
container_name: kibana
hostname: kibana
depends_on:
- elasticsearch
ports:
- 5601:5601
environment:
xpack.security.enabled: "false"
discovery.type: "single-node"
command:
- bash
- -c
- |
/usr/local/bin/kibana-docker &
echo "Waiting for Kibana to be ready ⏳"
while [ $$(curl -H 'kbn-xsrf: true' -s -o /dev/null -w %{http_code} http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) -ne 200 ] ; do
echo -e "\t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) " (waiting for 200)"
sleep 5
done
echo -e "\t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*)
echo -e "\n--\n+> Pre-creating index pattern"
curl -s -XPOST 'http://localhost:5601/api/saved_objects/index-pattern/mysql-debezium-asgard.demo.orders' \
-H 'kbn-xsrf: nevergonnagiveyouup' \
-H 'Content-Type: application/json' \
-d '{"attributes":{"title":"mysql-debezium-asgard.demo.orders","timeFieldName":"CREATE_TS"}}'
echo -e "\n--\n+> Setting the index pattern as default"
curl -s -XPOST 'http://localhost:5601/api/kibana/settings' \
-H 'kbn-xsrf: nevergonnagiveyouup' \
-H 'content-type: application/json' \
-d '{"changes":{"defaultIndex":"mysql-debezium-asgard.demo.orders"}}'
echo -e "\n--\n+> Opt out of Kibana telemetry"
curl 'http://localhost:5601/api/telemetry/v2/optIn' \
-H 'kbn-xsrf: nevergonnagiveyouup' \
-H 'content-type: application/json' \
-H 'accept: application/json' \
--data-binary '{"enabled":false}' \
--compressed
sleep infinity
neo4j:
image: neo4j:4.2.3
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_AUTH: neo4j/connect
NEO4J_dbms_memory_heap_max__size: 8G
NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'
kafkacat:
image: edenhill/kafkacat:1.6.0
container_name: kafkacat
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
字符串
这是我的sample.json
{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_00",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\\.csv",
"schema.generation.enabled":"true",
"csv.first.row.as.header":"true"
}
型
和输出
C:\Users\vinee\Downloads\kafka-connect\demo-scene-master\kafka-connect-zero-to-hero>curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config -d @csv-connector.json
HTTP/1.1 400 Bad Request
Date: Sat, 04 Nov 2023 10:14:54 GMT
Content-Type: application/json
Content-Length: 381
Server: Jetty(9.4.40.v20210413)
{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value '/data/unprocessed' must be a directory. for configuration input.path\nInvalid value '/data/error' must be a directory. for configuration error.path\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
or
{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value File 'C:/Users/vinee/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero/data/unprocessed' is not an absolute path. for configuration input.path\nInvalid value File 'C:/Users/vinee/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero/data/error' is not an absolute path. for configuration error.path\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
型
我已经尝试改变一切,来到我的脑海中获得有效的值“输入.路径”,“输出.路径”和“错误.路径”,但输出不改变,除了什么是在引号后无效值
C:\Users\vinee\Downloads\kafka-connect\demo-scene-master\kafka-connect-zero-to-hero\data\unprocessed
C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero//data//unprocessed
dir://C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero//data//unprocessed
/c:/Users/vinee/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero/data/unprocessed
.\data\unprocessed
型
这是数据文件夹
的数据
1条答案
按热度按时间2hh7jdfx1#
在docker-compose.yaml中将
PWD
更改为C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero
,将C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero//data//unprocessed
更改为/data/unprocessed
,我的状态为201。感谢@OneCricketeer的帮助。