在flink sql cli客户端中将kafka主题作为flink表查看?

bqf10yzr  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(391)

我正在尝试将kafka主题作为flinksqlcli客户机中的表放到apear中。
下面是如何调用sql-client.sh:

./bin/sql-client.sh embedded -l /Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/lib  -d ./conf/sql-client-config-1.yaml

我得到以下例外:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

以下是完整的堆栈跟踪:

USHOLBPI1-ML:flink Behzad.Pirvali$ ./bin/sql-client.sh embedded -l /Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/lib  -d ./conf/sql-client-config-1.yaml
Reading default environment from: file:/Users/Behzad.Pirvali/Tools/Data/flink-1.7.2/./conf/sql-client-config-1.yaml
No session environment specified.
Validating current environment...

Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
    at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
    at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
    at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
    ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=localhost:9092
connector.properties.2.key=group.id
connector.properties.2.value=group-scanned-tickets
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=scanned-tickets
connector.type=kafka
connector.version=2.2.0
format.property-version=1
format.schema=ROW(venueId LONG, eventName STRING, ticketId LONG, eventStartTime TIMESTAMP, eventTime TIMESTAMP)
format.type=json
schema.0.name=venueId
schema.0.type=LONG
schema.1.name=eventName
schema.1.type=STRING
schema.2.name=ticketId
schema.2.type=LONG
schema.3.name=eventStartTime
schema.3.type=TIMESTAMP
schema.4.name=scannedTime
schema.4.rowtime.timestamps.from=eventTime
schema.4.rowtime.timestamps.type=from-field
schema.4.rowtime.watermarks.delay=60000
schema.4.rowtime.watermarks.type=periodic-bounded
schema.4.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

    at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:218)
    at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:134)
    at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
    at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
    at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
    at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
    ... 4 more

下面是sql-config.yaml:

tables:
  - name: scanned_tickets
    type: source
    update-mode: append
    schema:
    - name: venueId
      type: LONG
    - name: eventName
      type: STRING
    - name: ticketId
      type: LONG
    - name: eventStartTime
      type: TIMESTAMP
    - name: scannedTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "eventTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    connector:
      property-version: 1
      type: kafka
      version: 2.2.0
      topic: scanned-tickets
      startup-mode: earliest-offset
      properties:
      - key: zookeeper.connect
        value: localhost:2181
      - key: bootstrap.servers
        value: localhost:9092
      - key: group.id
        value: group-scanned-tickets
    format:
      property-version: 1
      type: json
      schema: "ROW(venueId LONG, eventName STRING, ticketId LONG, eventStartTime TIMESTAMP, eventTime TIMESTAMP)"

# ==============================================================================

# Execution properties

# ==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
  type: streaming              # 'batch' or 'streaming' execution
  result-mode: table           # 'changelog' or 'table' presentation of results
  parallelism: 1               # parallelism of the program
  max-parallelism: 128         # maximum parallelism
  min-idle-state-retention: 0  # minimum idle state retention in ms
  max-idle-state-retention: 0  # maximum idle state retention in ms

# ==============================================================================

# Deployment properties

# ==============================================================================

# Deployment properties allow for describing the cluster to which table

# programs are submitted to.

deployment:
  type: standalone             # only the 'standalone' deployment is supported
  response-timeout: 5000       # general cluster communication timeout in ms
  gateway-address: ""          # (optional) address from cluster to gateway
  gateway-port: 0              # (optional) port from cluster to gateway

flink版本是1.7.2,下面是lib目录内容:

-rw-r--r--   1 Behzad.Pirvali  staff     28524 May 30 10:25 flink-json-1.7.2.jar
drwxr-xr-x@  8 Behzad.Pirvali  staff       256 May 30 10:25 .
-rw-r--r--   1 Behzad.Pirvali  staff   1739194 May 30 10:25 flink-connector-kafka-0.11_2.12-1.7.2-sql-jar.jar
drwxr-xr-x@ 12 Behzad.Pirvali  staff       384 Feb 11 06:50 ..
-rw-r--r--@  1 Behzad.Pirvali  staff  84497196 Feb 11 06:50 flink-dist_2.12-1.7.2.jar
-rw-r--r--@  1 Behzad.Pirvali  staff    141942 Feb 11 06:49 flink-python_2.12-1.7.2.jar
-rw-r--r--@  1 Behzad.Pirvali  staff    489884 Feb 11 06:32 log4j-1.2.17.jar
-rw-r--r--@  1 Behzad.Pirvali  staff      9931 Feb 11 06:32 slf4j-log4j12-1.7.15.jar

所以,看起来我丢失了一个jar文件,但我不太清楚是哪一个?

eanckbw9

eanckbw91#

我找到了问题的根源。yaml文件中的配置不匹配:

connector:
      property-version: 1
      type: kafka
      version: 2.2.0

我正在设置代理版本,但是版本必须是0.11才能匹配fink-kafka连接器:flink-connector-kafka-0.11_2.12-1.7.2-sql-jar
因此,将配置更改为以下效果:

connector:
      property-version: 1
      type: kafka
      version: 0.11

相关问题