我在databricks中有一个用例,其中必须对url的数据集进行api调用。这个数据集有大约10万条记录。允许的最大并发数为3。
我在scala中实现,并在databricks笔记本中运行。除了队列中的一个待处理元素之外,我觉得这里缺少了一些东西。
阻塞队列和线程池是否是解决此问题的正确方法。
在下面的代码中,我修改了数据集,而不是从数据集中读取数据,而是在seq上采样。任何帮助/想法都将不胜感激。
import java.time.LocalDateTime
import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue}
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit;
var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1)
val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen"))
val pool = Executors.newFixedThreadPool(3)
var i = 0
inpDS.foreach{
ix => {
inpQueue.put(ix)
val t = new ConsumerAPIThread()
t.setName("MyThread-"+i+" ")
pool.execute(t)
}
i = i+1
}
println("Final Queue Size = " +inpQueue.size+"\n")
class ConsumerAPIThread() extends Thread
{
var name =""
override def run()
{
val urlDetail = inpQueue.take()
print(this.getName()+" "+ Thread.currentThread().getName() + " popped "+urlDetail+" Queue Size "+inpQueue.size+" \n")
triggerAPI((urlDetail._1, urlDetail._2))
}
def triggerAPI(params:(Int,String)){
try{
val result = scala.io.Source.fromURL(params._2)
println("" +result)
}catch{
case ex:Exception => {
println("Exception caught")
}
}
}
def ConsumerAPIThread(s:String)
{
name = s;
}
}
2条答案
按热度按时间mklgxw1f1#
因此,您有两个要求:功能性要求是您希望异步处理列表中的项,非功能性要求是您不希望同时处理三个以上的项。
关于后者,好的方面是,正如您在问题中已经展示的那样,java本机公开了一个很好的封装
Executor
它以固定大小在线程池上运行任务,允许您在处理线程时优雅地限制并发级别。转到功能性需求,scala的标准api中包含了一些可以精确实现这一点的功能。特别是它使用
scala.concurrent.Future
,所以为了使用它,我们必须重新构造triggerAPI
依据Future
. 函数的内容不是特别相关,所以我们现在主要关注它的(修订)签名:注意现在
triggerAPI
返回一个Future
. 一Future
可以认为是一个读句柄的东西,将最终计算。特别是,这是一个Future[Unit]
,在哪里Unit
代表“我们并不特别关心这个函数的输出,但主要关心它的副作用”。此外,请注意,该方法现在接受一个隐式参数,即
ExecutionContext
. 这个ExecutionContext
用于提供Future
在某种形式的环境中进行计算。scala有一个api来创建ExecutionContext
从java.util.concurrent.ExecutorService
,因此这对于在固定线程池上运行我们的计算非常方便,在任何给定的时间运行不超过三次回调。在前进之前,如果你对
Future
是的,ExecutionContext
scala文档是您最好的知识来源(这里有几个指针:1,2)。现在我们有了新的
triggerAPI
方法,我们可以使用Future.traverse
(以下是Scala2.12的文档——撰写本文时的最新版本是2.13,但据我所知,spark用户暂时还停留在2.12上)。tl;博士
Future.traverse
它采用某种形式的容器和函数,该函数接受容器中的项并返回Future
其他的东西。该函数将应用于容器中的每个项目,结果将是Future
结果容器的。在您的情况下:容器是一个List
,项目为(Int, String)
而你所归还的其他东西是Unit
.这意味着你可以这样称呼它:
以及
triggerAPI
将应用于中的每个项目inpDS
.通过确保线程池支持的执行上下文在调用
Future.traverse
,将使用所需的线程池处理这些项。通话结果是
Future[List[Unit]]
,这不是很有趣,可以简单地丢弃(因为你只对副作用感兴趣)。这是一个很大的谈话,如果你想玩我描述的代码,你可以这样做在这里的斯卡斯蒂。
作为参考,这是整个实施过程:
sycxhyv72#
你需要关掉电脑
Executor
在你的工作完成之后,它将等待。尝试添加
pool.shutdown()
你的节目结束了。