意外字符(“< ”(代码60))

r6hnlfcb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(540)

我正在尝试执行一个进程,使用connectjdbc从oracledb获取数据,但是在尝试向schemaregistry注册schema时总是失败。这个过程以前在旧版本上工作,但我不知道是我更改了什么,还是api或配置更改了什么。我尝试从confluent schema registry中使用教程中的curl命令成功地运行了一些示例,从connect到schema registry提交了一个命令,从https://docs.confluent.io/6.0.1/schema-registry/develop/using.html,除了我在示例中使用kafka avro控制台生成器时https://rmoff.net/2016/12/02/kafka-avro-console-producer-error-registering-avro-schema-/-io.confluent.kafka.schemaregistry.client.rest.exceptions.restclientexception/,这是同样的错误。
我的 Docker 组成如下:

---
version: '2'
networks:
  stream:
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - stream

  broker:
    image: confluentinc/cp-server:6.0.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
      - "29092:29092"
      - "39092:39092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,QUICKSTART:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://zookeeper:9092,QUICKSTART://quickstart.cloudera:39092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    networks:
      - stream

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: full
      SCHEMA_REGISTRY_DEBUG: 'true'
    networks:
      - stream

  connect:
    image: confluentinc/cp-kafka-connect:6.0.1
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.0.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    networks:
      - stream
    command:
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt jcustenborder/kafka-connect-transform-xml:0.1.0.18
        confluent-hub install --no-prompt streamthoughts/kafka-connect-file-pulse:1.5.0
        confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.3.0
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
        # -----------
        # Launch the Kafka Connect worker
        /etc/confluent/docker/run &
        #
        # Don't exit
        sleep infinity
    volumes:
      - $PWD/net.properties:/usr/lib/jvm/jre/conf/net.properties

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.0.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://quickstart.cloudera:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    networks:
      - stream
  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:6.0.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
    networks:
      - stream

  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.0.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
    networks:
      - stream

我的连接配置:

name = quickstart-jdbc-source
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max = 1
config.action.reload = restart
errors.retry.timeout = 0
errors.retry.delay.max.ms = 60000
errors.tolerance = none
errors.log.enable = true
errors.log.include.messages = true
connection.url = jdbc:oracle:thin:@//#######
connection.user = USER
connection.password = PWD
connection.attempts = 3
connection.backoff.ms = 10000
table.whitelist = DB.TABLENAME
numeric.precision.mapping = false
mode = bulk
timestamp.column.name = DTFIELD
validate.non.null = true
quote.sql.identifiers = ALWAYS
table.types = TABLE
poll.interval.ms = 5000
batch.max.rows = 100
table.poll.interval.ms = 60000
topic.prefix = quickstart-jdbc-
timestamp.delay.interval.ms = 0
db.timezone = UTC
value.converter.schema.registry.url = http://schema-registry:8081
key.converter.schema.registry.url = http://schema-registry:8081

记录的错误:

[2020-12-07 13:26:05,336] INFO Using JDBC dialect Oracle (io.confluent.connect.jdbc.source.JdbcSourceTask),
[2020-12-07 13:26:05,336] INFO Attempting to open connection #1 to Oracle (io.confluent.connect.jdbc.util.CachedConnectionProvider),
[2020-12-07 13:26:05,562] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask),
[2020-12-07 13:26:05,563] INFO WorkerSourceTask{id=quickstart-jdbc-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask),
[2020-12-07 13:26:05,566] INFO Begin using SQL query: SELECT * FROM "DB"."TABLENAME" (io.confluent.connect.jdbc.source.TableQuerier),
[2020-12-07 13:26:05,946] ERROR Error encountered in task quickstart-jdbc-source-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where source record is = /*REMOVED*/,
org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic quickstart-jdbc-TABLENAME :,
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:91),
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314),
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156),
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190),
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264),
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185),
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235),
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515),
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264),
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128),
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628),
    at java.base/java.lang.Thread.run(Thread.java:834),
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"TABLENAME","fields":[{"name":"FIELD1","type":"string"},{"name":"FIELD2","type":"string"},{"name":"DTFIELD","type":{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"FIELD5","type":{"type":"bytes","scale":0,"precision":64,"connect.version":1,"connect.parameters":{"scale":"0"},"connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}},{"name":"FIELD6","type":"string"},{"name":"FIELD7","type":"string"},{"name":"FIELD8","type":"string"},{"name":"FIELD9","type":{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}}],"connect.name":"TABLENAME"},
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false'),
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005,
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292),
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352),
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495),
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486),
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459),
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:214),
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:276),
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:252),
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:75),
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:143),
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84),
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314),
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156),
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190),
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340),
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264),
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185),
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235),
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515),
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264),
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128),
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628),
    at java.base/java.lang.Thread.run(Thread.java:834),

根据要求,docker撰写ps输出:

Name                    Command                  State                           Ports
---------------------------------------------------------------------------------------------------------------broker            /etc/confluent/docker/run        Up             0.0.0.0:29092->29092/tcp,
                                                                  0.0.0.0:39092->39092/tcp,
                                                                  0.0.0.0:9092->9092/tcp,
                                                                  0.0.0.0:9101->9101/tcp
connect           bash -c echo "Installing c ...   Up (healthy)   0.0.0.0:8083->8083/tcp, 9092/tcp
control-center    /etc/confluent/docker/run        Up             0.0.0.0:9021->9021/tcp
ksqldb-server     /etc/confluent/docker/run        Up             0.0.0.0:8088->8088/tcp
rest-proxy        /etc/confluent/docker/run        Up             0.0.0.0:8082->8082/tcp
schema-registry   /etc/confluent/docker/run        Up             0.0.0.0:8081->8081/tcp
zookeeper         /etc/confluent/docker/run        Up             0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题