这是我第一次尝试Kafka流。我用日志正确地创建了一个如下所示的主题:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning
{"event":"bank.user.patch","ts":"2017-05-11T15:02:53.647+02:00","svc":"dpbank.wb.tlc-1","request":{"ts":"2017-05-11T15:02:53.905+02:00","aw":"c0dc73ae-e903-43c2-bba8-2138d7772945","end_point":"/bank/v1/user/Nqp0a++O4wEKgBCMs35GTw==","method":"PATCH","app_instance":"85164F62-91FA-4FF4-BE8E-1C0BA8F291A9-1488367268","user_agent":"Dy/1.1/iOS/10.1.1/iPhone MDL","user_id":423,"user_ip":"xxxxx","username":"booWlWuCPPltvQgMNiKwrQ==","app_id":"db2ffe87712530981e9871","app_name":"DApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"Ns35GTw==","request_attributes":{"user":{"msisdns":"rTA4G5+h9YA==","selfie":false,"taxcode":"Qz96apGFHlQoV/VtGrJDzZpt6cN4mTxSZs1pEI="}},"response_attributes":{"user":{"sharing_id":"NqpCMs35GTw==","msisdns":"Zsd/v08t6hU9AQV8zXna9Ypv/JITMZj3ulGw=","msisdn_caller":"booWlWuCPPltrQ==","selfie":false,"taxcode":"apGFHlQoV/VtGrJDzZpt6cN4mTxWd+K5SZs1pEI","status":"INCOMPLETE","document_info":{"document_image":false}}}},"class":"DPAPI"}
{"event":"bank.user.patch","ts":"2017-04-07T17:42:31.035+02:00","svc":"dpbank.wb.tlc-1","request":{"ts":"2017-04-07T17:42:31.353+02:00","aw":"99c57-8598-b226af153ab9","end_point":"/ba19XFUV+FA==","method":"PATCH","app_instance":"3558887f-7480-4176-b96c-d989ef1a7aa5-1489492341","user_agent":"Drodroid/5.0.1/Samsung-SM-N910C","user_id":398,"user_ip":"151.14.81.82","username":"dNGqxhJ+4kmmF1h3hgu=","app_id":"db2ffeac6c07712530981e9871","app_name":"DropPayApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"KJl+60+x67JFUV+FA==","request_attributes":{"user":{"sharing_id":"KJl+T619XFUV+FA==","msisdns":[],"firstname":"gR47acZfexoW+HYA==","lastname":"h3gRVpNzavhNu4wQ==","gender":"M","selfie":false,"taxcode":"2INKXPiBeg5acM4nn04S+JrlgJ9rmYHNghUw=","status":"INCOMPLETE","birthinfo":{"city":"Zurigo","date":"1975-06-16","country_id":241},"residence":{"city":"CAIA","address":"Va ello 44","zipcode":"926","country_id":1,"city_id":123}}},"response_attributes":{"user":{"sharing_id":"KJl+60+x67JT619XFUV+FA==","msisdns":[],"firstname":"gR47acZfHdgSGcexoW+HYA==","lastname":"h3MyQR3YgRVpNzavhNu4wQ==","gender":"M","selfie":false,"taxcode":"2INKXPiBeg5acM4nn04S+JrllI6mH2YgJ9rmYHNghUw=","status":"INCOMPLETE","birthinfo":{"city":"Zurigo","date":"1975-06-16","country_id":1},"residence":{"city":"TANIA","address":"Vlo 44","zipcode":"926","country_id":18,"city_id":103},"document_info":{"document_image":false}}}},"class":"DPAPI"}
现在我试着用这个主题来做一些逻辑。
例如,我想将主题中每个日志的一些字段放在 Ktable
然后把它们送到别的地方。
我试过这样做,但不幸的是没有结果
到目前为止,我已经做到了:
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstate");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("example");
然后我尝试了ktable,它应该是我的“表”,包含一些从日志中提取的字段。
KTable<String, Long> counts = source.print();
// need to override value serde to Long type
counts.to("example-output");
只是看看逻辑是否正确,看看我是否正确地获取日志并将它们放在另一个“输出”主题中。但这样做我有一个 Exception in thread "main" java.lang.NullPointerException
...
我走的路对吗?为了正确阅读Kafka主题,提取一些字段并将它们放在ktable中,我应该怎么做?
谢谢
1条答案
按热度按时间z31licg01#
首先,如果您实际显示了完整的异常,那么它会有所帮助—为什么以及从何处获得异常
NullPointerException
?我走的路对吗?为了正确阅读Kafka主题,提取一些字段并将它们放在ktable中,我应该怎么做?
您的一般方法是可以的,但看起来您的代码有一些错误。例如: