我有一个pyspark应用程序,它配置了2个数据流:
数据流a)从kafka读取(主题1和主题2)->合并主题dstream->do stuff->输出到redis。
数据流b)从kafka读取(主题3、4和5)->合并主题dstream->do stuff->输出到相同的redis。
我只能在redis中配置一个,但不能同时配置两个。
实际上,即使我将redis output更改为pprint,也会发生这种情况。只有当两个流中只有一个处于活动状态时,才会打印输出。
我错过了什么?
我有一个pyspark应用程序,它配置了2个数据流:
数据流a)从kafka读取(主题1和主题2)->合并主题dstream->do stuff->输出到redis。
数据流b)从kafka读取(主题3、4和5)->合并主题dstream->do stuff->输出到相同的redis。
我只能在redis中配置一个,但不能同时配置两个。
实际上,即使我将redis output更改为pprint,也会发生这种情况。只有当两个流中只有一个处于活动状态时,才会打印输出。
我错过了什么?
2条答案
按热度按时间sz81bmfz1#
你想加入2个数据流吗?我是说第一个数据流指向topica,返回(k,v),第二个数据流监视topicb,返回(k,v1)。如果是,那么可以使用join()方法连接2dstream,该方法将返回(k(v,v1))。
或者您想在创建流(kafkautils.createstream())时传递topica、topicb,然后将(k,v)后跟(k,v1)插入redis?
zzlelutf2#
问题是可用的执行线程数。
spark文档中说,每个接收器都使用一个专用线程来获取数据。我在本地模式下运行这个应用程序,主url为“local[*]”,这给了我4个线程,因为我的电脑有4个内核。
由于我的应用程序配置了5个不同的kafka主题,我需要至少6个线程(5+1驱动程序)才能运行我的程序。因为我只有4个,而且数据流被合并到下游,所以计算永远不会开始。
解决方案:
MASTER_URL = local[10]