filesource连接器读取文件的内容

92vpleto  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(268)

如何使用Kafka生产者读取文件的内容?这里找到的典型解决方案(使用 | )看起来又脏又丑。

dz6r00yl

dz6r00yl1#

我最近发现了一个比将文件内容管道化到producershell中更合适的解决方案,那就是使用filesource连接器。
根据链接,filesource connector的目标是精确地解决“将文件的数据读入producer”的用例,比如检查日志文件的内容,并在出现错误时发出警报 [ERROR] 或者 [FATAL] 遇到。
完整的命令是(假设我们在kafka的根文件夹中):

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

要配置的两个属性文件:
config/connect-standalone.properties config/connect-file-source.properties 第一个定义了如何连接到独立连接器。就像:


# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

# 

# http://www.apache.org/licenses/LICENSE-2.0

# 

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

# need to configure these based on the format they want their data in when loaded from or stored into Kafka

 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply

# it to

key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will

# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.

 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins

# (connectors, converters, transformations). The list should consist of top level directories that include

# any combination of:

# a) directories immediately containing jars with plugins and their dependencies

# b) uber-jars with plugins and their dependencies

# c) directories immediately containing the package directory structure of classes of plugins and their dependencies

# Note: symlinks will be followed to discover dependencies or plugins.

# Examples:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

# plugin.path=

很简单。只有两件事需要注意: bootstrap.servers=localhost:9092 :kafka引导服务器 (internal.)key/value.converter.schemas.enable=false :必须将其设置为 false 解析文件中的字符串行。
第二个文件更简单:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test
``` `file` :要读取的文件 `topic` :创建一个主题,让消费者倾听
如果你想用storm来消费内容,那就足够了。
如果您不想读取文件,而是想将内容从kafka写入文件,则可以使用filesink connector。我没有亲自使用它,但我想它也是,但在消费者方面。配置文件是 `config/connect-file-sink.properties` .

相关问题