siddhi-cep和kafka

ki0zmccv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(432)

我目前正在整合wso2的siddhicep和kafka。我想通过接收Kafka的事件来产生一个siddhi流。接收到的kafka数据是json格式的,其中每个事件如下所示:

{  
   "event":{  
      "orderID":"1532538588320",
      "timestamps":[  
         15325,
         153
      ],
      "earliestTime":1532538
   }
}

我尝试在wso2流处理器中运行的siddhiapp如下所示:

@App:name('KafkaSiddhi')
@App:description('Consume events from a Kafka Topic and print the output.')

-- Streams
@source(type='kafka', 
topic.list = 'order-aggregates',
partition.no.list = '0',
threading.option = 'single.thread',
group.id = 'time-aggregates',
bootstrap.servers = 'localhost:9092, localhost:2181',
@map(type='json'))
define stream TimeAggregateStream (orderID string,timestamps 
object,earliestTime long);

@sink(type="log")
define stream TimeAggregateResultStream (orderID string, timestamps 
object, earliestTime long);

-- Queries
from TimeAggregateStream 
select orderID, timestamps, earliestTime
insert into TimeAggregateResultStream;

运行这个应用程序应该按照我正在侦听的kafka集群的顺序记录所有正在更新的数据。但我看不到任何输出时,点击运行。
我可以看出wso2流处理器和order aggregates主题之间存在某种类型的交互,因为每当我使用与流模式不一致的数据类型运行应用程序时,都会实时输出错误消息。错误消息如下所示:

[2018-07-25_10-14-37_224] ERROR 
{org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper} - 
Json message {"event":{"orderID":"210000000016183","timestamps": 
[1532538627000],"earliestTime":1532538627000}} contains incompatible 
attribute types and values. Value 210000000016183 is not compatible with 
type LONG. Hence dropping the message. (Encoded)

但是,如果模式设置正确,则在运行应用程序时根本不会收到任何输出。我真的不知道怎么理解这个。当我试图通过在包含“insert into”的行中插入断点来调试它时,调试器从不在该行停止。
有人能就如何处理这个问题提供一些见解吗?

ldxq2e6h

ldxq2e6h1#

我们在最新版本的扩展中添加了对jsonMap器扩展的对象支持。请下载扩展名[1]并替换/lib中的siddhi map json jar。
[1] https://store.wso2.com/store/assets/analyticsextension/details/0e6a6b38-f1d1-49f5-a685-e8c16741494d
祝你好运,拉明杜。

相关问题