如何使用ApacheKafka、AmazonGlue和AmazonS3创建datalake?

0yg35tkg  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(453)

我想将Kafka主题中的所有数据存储到amazons3中。我有一个kafka集群,它在一个主题中每秒接收200.000条消息,每条值消息有50个字段(字符串、时间戳、整数和浮点)。
我的主要想法是使用kafka连接器将数据存储在一个bucket s3中,然后使用amazon glue将数据转换并保存到另一个bucket中。我有接下来的问题:
1) 怎么做?那种架构会很好用吗?我尝试了amazonemr(spark streaming),但是我太担心如何使用apache spark来减少处理时间和失败的任务,因为apachekafka的事件流?
2) 我试图从confluent使用kafka connect,但我有几个问题:
我可以从其他kafka示例连接到我的kafka群集并以独立方式运行kafka connector s3吗?
这个错误“error task s3-sink-0抛出了一个未捕获的
不可恢复的异常“?
错误任务s3-sink-0引发了未捕获且不可恢复的异常(org.apache.kafka.connect.runtime.workertask:142)io.confluent.connect.hdfs.hdfssinktask.close(hdfssinktask)处的java.lang.nullpointerexception。java:122)位于org.apache.kafka.connect.runtime.workersinktask.commitofsets(workersinktask。java:290)在org.apache.kafka.connect.runtime.workersinktask.closepartitions(workersinktask。java:421)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:146)在org.apache.kafka.connect.runtime.workertask.dorun(workertask。java:140)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:175)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745) [2018-10-05 15:32:26086]错误任务被终止,在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.w)orkertask:143)[2018-10-05 15:32:27980]警告无法使用url文件中的目录创建目录:/target。跳过(org.reflections.re公司flections:104)org.reflections.vfs.vfs$defaulturltypes$3.matches上的java.lang.nullpointerexception(vfs。java:239)在org.reflections.vfs.vfs.fromurl(vfs。java:98)在org.reflections.vfs.vfs.fromurl(vfs。java:91)在org.reflections.reflections.scan(reflections。java:237)在org.reflections.reflections.scan(reflections。java:204)在反思。反思。java:129)位于org.apache.kafka.connect.runtime.abstractherder.connectorplugins(abstractherder.com)。java:268)在org.apache.kafka.connect.runtime.abstractherder$1.run(abstractherder.org.apache.kafka.connect.runtime.abstractherder$1.run)。java:377)在java.lang.thread.run(线程。java:745)[2018-10-05 15:32:27981]警告无法从url创建vfs.dir。忽略异常并继续(org.reflections.reflections:208)org.reflections.reflectionsexception:无法从url创建vfs.dir,找不到匹配的urltype[文件:/target]或者使用fromurl(最终url,final list urltypes)或将静态setdefaulturltypes(final list urltypes)或adddefaulturltypes(urltype urltype)与专用的urltype一起使用。在org.reflections.vfs.vfs.fromurl(vfs。java:109)在org.reflections.vfs.vfs.fromurl(vfs。java:91)在org.reflections.reflections.scan(reflections。java:237)在org.reflections.reflections.scan(reflections。java:204)在org.reflections.reflections。java:129)在org.apache.kafka.connect.runtime.abstractherder.connectorplugins(abstractherder。java:268)在org.apache.kafka.connect.runtime.abstractherder$1.run(abstractherder.org.apache.kafka.connect.runtime.abstractherder$1.run)。java:377)在java.lang.thread.run(线程。java:745)[2018-10-05 15:32:35441]信息反射花了12393毫秒扫描429个URL,生成13521个键和95814个值(org.reflections.reflections:229)
如果你能继续连接Kafka的步骤并继续从
另一个Kafka的例子,你会怎么做?
所有这些字段key.converter、value.converter、key.converter.schemas.enable、value.converter.schemas.enable、internal.key.converter、internal.value.converter、internal.key.converter.schemas.enable、internal.value.converter.schemas.enable是什么意思?
key.converter、value.converter的可能值是多少?
3) 一旦我的原始数据在一个桶中,我想使用amazongluke获取这些数据,反序列化protobuffer,更改一些字段的格式,最后将其存储在parquet的另一个桶中。如何在amazon glue中使用自己的java protobuffer库?
4) 如果我想用AmazonAthena进行查询,如何自动加载分区(年、月、日、时)?亚马逊胶水的爬虫程序和调度程序?

tnkciper

tnkciper1#

补充@cricket\u 007的答案
我可以从其他kafka示例连接到我的kafka群集并以独立方式运行kafka connector s3吗?
kafka s3 connector是confluent distribution的一部分,它还包括kafka以及其他相关服务,但它并不打算直接在您的代理上运行,而是:
作为一个独立的辅助进程运行服务启动时给定的连接器配置
或者作为一个额外的工人集群运行在您的Kafka经纪人集群一侧。在这种情况下,通过kafka connect restapi进行连接器的交互/运行会更好(搜索“manageing kafka connectors”以获取示例文档)
如果您可以继续执行连接到kafka的步骤,并从另一个kafka示例继续使用s3,您会怎么做?
你说的是另一个Kafka连接示例吗?
如果是这样的话,您可以简单地在分布式模式下执行kafka connect服务,这意味着提供您想要的可靠性。。。
或者你是说另一个Kafka(经纪人)集群?
在这种情况下,您可以尝试(但那将是实验性的,我自己也没有尝试过…)在独立模式下运行kafka connect并简单地进行更新 bootstrap.servers 连接器配置的参数,以指向新群集。工作原理:在独立模式下,接收器连接器的偏移量本地存储在worker上(与分布式模式相反,在分布式模式下,偏移量直接存储在kafka集群上……)。为什么这可能不起作用:它根本不打算用于此用途,我猜您可能需要您的主题和分区完全相同。。。?
key.converter、value.converter的可能值是多少?
查看confluent的文档,了解kafka-connect-s3;)
如何在amazon glue中使用自己的java protobuffer库?
不确定实际的方法,但胶水作业会在幕后产生一个emr集群,所以我不明白为什么不可能。。。
如果我想用AmazonAthena进行查询,如何自动加载分区(年、月、日、时)?亚马逊胶水的爬虫程序和调度程序?
对。
假设每天都进行分区,实际上您可以安排在早上第一件事运行爬虫程序,只要您希望新数据在s3上创建了当天的文件夹(因此s3上至少存在一个当天的对象)。。。爬虫程序将添加当天的分区,该分区将可用于查询任何新添加的对象。

kyxcudwk

kyxcudwk2#

我们使用s3connect处理数百个主题,并使用hive、athena、spark、presto等处理数据。看起来效果不错,不过我觉得实际的数据库可能会更快地返回结果。
在任何情况下,回答关于连接
我可以从其他kafka示例连接到我的kafka群集并以独立方式运行kafka connector s3吗?
我不确定我是否理解这个问题,但是kafka connect需要连接到一个集群,使用它不需要两个kafka集群。您通常将kafka connect进程作为其自己集群的一部分运行,而不是在代理上运行。
这个错误“error task s3-sink-0抛出了一个未捕获的、不可恢复的异常”是什么意思?
这意味着您需要查看日志,以确定抛出了什么异常,并阻止连接器读取数据。 WARN could not create Dir using directory from url file:/targ ... 如果您使用的是hdfs连接器,我认为您不应该使用默认的文件://uri
如果您可以继续执行连接到kafka的步骤,并从另一个kafka示例继续使用s3,您会怎么做?
你不能“从另一个Kafka示例恢复”。如前所述,connect只能从单个kafka集群进行消费,任何消费的偏移量和消费组都与之存储在一起。
所有这些领域意味着什么
这些字段将从最新的Kafka版本中删除,您可以忽略它们。你绝对不应该改变他们 internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable 这些是您的序列化程序和反序列化程序,就像常规的生产者-消费者api一样 key.converter, value.converter 我相信这些只对json转换器重要。看到了吗https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas-enable-requires-schema-and-payload-fields key.converter.schemas.enable, value.converter.schemas.enable 对protobuf进行反序列化,更改某些字段的格式,最后将其存储在parquet中的另一个bucket中
Kafka连接将需要加载一个protobuf转换器,我不知道有一个(我认为蓝色围裙写的东西。。。搜索github)。
一般来说,avro将更容易转换为parquet,因为本地库已经存在。s3connectbyconfluent目前没有编写parquet格式,但是在一个开放的pr中有一个。另一种方法是使用pinterest secor库。
我不知道胶水,但如果它像Hive,你会用 ADD JAR 在查询过程中加载外部代码插件和函数
我对雅典娜的使用经验很少,但是glue将所有分区作为一个Hive元存储区来维护。自动部分将是爬虫,你可以把一个过滤器上的查询做分区修剪

相关问题