我想用本机spark函数替换as lookup的udf实现。
因为查找表有大约500000条记录,如下所示:
+-----------+------------------------+------------------------------+
| network|autonomous_system_number|autonomous_system_organization|
+-----------+------------------------+------------------------------+
| 1.0.0.0/24| 13335| CLOUDFLARENET|
| 1.0.4.0/22| 38803| Wirefreebroadband...|
|1.0.16.0/24| 2519| ARTERIA Networks ...|
|1.0.64.0/18| 18144| Energia Communica...|
+-----------+------------------------+------------------------------+
传入数据包含有关源、目标ip地址的信息:
+-----------------+----------------------+
|sourceIPv4Address|destinationIPv4Address|
+-----------------+----------------------+
| 5.34.180.207| 213.226.224.51|
| 13.230.21.159| 81.95.96.2|
| 93.188.0.21| 34.223.46.15|
| 165.22.170.210| 89.187.156.222|
+-----------------+----------------------+
因为查找表只有关于ip子网的信息,所以我可能需要使用最长前缀匹配算法来连接这两个dfs,但是我不知道如何在不使用另一个udf的情况下将其实现到spark。有人能给我指出正确的方向吗?
此时,我使用maxmind作为查找库:
val asnLookup = udf(Udf.getASN)
val dfAS = df
.withColumn("sourceAS", asnLookup(col(IpfixFields.sourceIPv4Address)))
.withColumn("destinationAS", asnLookup(col(IpfixFields.destinationIPv4Address)))
def getASN: String => String = (ipAddress: String) => {
val ia = InetAddress.getByName(ipAddress)
val result = GeoIPWrapper.reader.tryAsn(ia)
if (!result.isPresent) {
"na"
} else {
result.get().getAutonomousSystemNumber.toString
}
}
暂无答案!
目前还没有任何答案,快来回答吧!