我正在尝试连接到本地机器上的kafka(2.1),并在flink(1.7.2)附带的scala shell中读取kafka(2.1)。
下面是我要做的:
:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
在最后一个语句之后,我得到以下错误:
scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
我已经创建了一个名为“topic”的主题,并且我能够通过另一个客户端正确地生成和读取来自它的消息。我使用的是java版本1.8.0\u201,并遵循https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html .
有什么问题吗?
2条答案
按热度按时间fkaflof61#
有些依赖需要其他的依赖,隐式的。我们通常使用一些依赖关系管理器,比如maven或sbt,当我们向项目中添加一些依赖关系时,依赖关系管理器将在后台提供它的隐式依赖关系。
另一方面,当您使用没有依赖关系管理器的shell时,您负责提供代码依赖关系。使用flink kafka连接器需要
Flink Connector Kafka
但是你应该注意到Flink Connector Kafka
也需要一些依赖关系。您可以在页面底部找到它的依赖项,该页面位于compile dependencies部分。从这个前言开始,我在目录中添加了以下jar文件FLINK_HOME/lib
(flink类路径):我可以使用flink shell中的以下代码成功地使用kafka消息:
此外,向flink类路径添加一些jar文件的另一种方法是将jar作为flink shell start命令的参数传递:
测试环境:
dw1jzc5e2#
在添加源代码之前,很可能应该导入flink的scala隐式: