使用kafka流从kafka主题中包含的日志中提取信息

z4bn682m  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(564)

这是我第一次尝试Kafka流。我用日志正确地创建了一个如下所示的主题:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning
{"event":"bank.user.patch","ts":"2017-05-11T15:02:53.647+02:00","svc":"dpbank.wb.tlc-1","request":{"ts":"2017-05-11T15:02:53.905+02:00","aw":"c0dc73ae-e903-43c2-bba8-2138d7772945","end_point":"/bank/v1/user/Nqp0a++O4wEKgBCMs35GTw==","method":"PATCH","app_instance":"85164F62-91FA-4FF4-BE8E-1C0BA8F291A9-1488367268","user_agent":"Dy/1.1/iOS/10.1.1/iPhone MDL","user_id":423,"user_ip":"xxxxx","username":"booWlWuCPPltvQgMNiKwrQ==","app_id":"db2ffe87712530981e9871","app_name":"DApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"Ns35GTw==","request_attributes":{"user":{"msisdns":"rTA4G5+h9YA==","selfie":false,"taxcode":"Qz96apGFHlQoV/VtGrJDzZpt6cN4mTxSZs1pEI="}},"response_attributes":{"user":{"sharing_id":"NqpCMs35GTw==","msisdns":"Zsd/v08t6hU9AQV8zXna9Ypv/JITMZj3ulGw=","msisdn_caller":"booWlWuCPPltrQ==","selfie":false,"taxcode":"apGFHlQoV/VtGrJDzZpt6cN4mTxWd+K5SZs1pEI","status":"INCOMPLETE","document_info":{"document_image":false}}}},"class":"DPAPI"}
{"event":"bank.user.patch","ts":"2017-04-07T17:42:31.035+02:00","svc":"dpbank.wb.tlc-1","request":{"ts":"2017-04-07T17:42:31.353+02:00","aw":"99c57-8598-b226af153ab9","end_point":"/ba19XFUV+FA==","method":"PATCH","app_instance":"3558887f-7480-4176-b96c-d989ef1a7aa5-1489492341","user_agent":"Drodroid/5.0.1/Samsung-SM-N910C","user_id":398,"user_ip":"151.14.81.82","username":"dNGqxhJ+4kmmF1h3hgu=","app_id":"db2ffeac6c07712530981e9871","app_name":"DropPayApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"KJl+60+x67JFUV+FA==","request_attributes":{"user":{"sharing_id":"KJl+T619XFUV+FA==","msisdns":[],"firstname":"gR47acZfexoW+HYA==","lastname":"h3gRVpNzavhNu4wQ==","gender":"M","selfie":false,"taxcode":"2INKXPiBeg5acM4nn04S+JrlgJ9rmYHNghUw=","status":"INCOMPLETE","birthinfo":{"city":"Zurigo","date":"1975-06-16","country_id":241},"residence":{"city":"CAIA","address":"Va ello 44","zipcode":"926","country_id":1,"city_id":123}}},"response_attributes":{"user":{"sharing_id":"KJl+60+x67JT619XFUV+FA==","msisdns":[],"firstname":"gR47acZfHdgSGcexoW+HYA==","lastname":"h3MyQR3YgRVpNzavhNu4wQ==","gender":"M","selfie":false,"taxcode":"2INKXPiBeg5acM4nn04S+JrllI6mH2YgJ9rmYHNghUw=","status":"INCOMPLETE","birthinfo":{"city":"Zurigo","date":"1975-06-16","country_id":1},"residence":{"city":"TANIA","address":"Vlo 44","zipcode":"926","country_id":18,"city_id":103},"document_info":{"document_image":false}}}},"class":"DPAPI"}

现在我试着用这个主题来做一些逻辑。
例如,我想将主题中每个日志的一些字段放在 Ktable 然后把它们送到别的地方。
我试过这样做,但不幸的是没有结果
到目前为止,我已经做到了:

public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstate");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> source = builder.stream("example");

然后我尝试了ktable,它应该是我的“表”,包含一些从日志中提取的字段。

KTable<String, Long> counts = source.print();

        // need to override value serde to Long type
        counts.to("example-output");

只是看看逻辑是否正确,看看我是否正确地获取日志并将它们放在另一个“输出”主题中。但这样做我有一个 Exception in thread "main" java.lang.NullPointerException ...
我走的路对吗?为了正确阅读Kafka主题,提取一些字段并将它们放在ktable中,我应该怎么做?
谢谢

z31licg0

z31licg01#

首先,如果您实际显示了完整的异常,那么它会有所帮助—为什么以及从何处获得异常 NullPointerException ?
我走的路对吗?为了正确阅读Kafka主题,提取一些字段并将它们放在ktable中,我应该怎么做?
您的一般方法是可以的,但看起来您的代码有一些错误。例如:

KTable<String, Long> counts = source.print();
``` `print()` 退货 `void` ,因此上面的行甚至不编译。也许你应该仔细检查一下你的答案是否包含了所有相关的信息?
我建议大家看一下网站上提供的许多示例和演示应用程序https://github.com/confluentinc/examples. 他们应该给你一些模板作为起点。

相关问题