我使用apachespark在以太网通信中查找模式/攻击。我担心spark发送到yarn/hadoop执行节点的数据量。
我在map函数中使用scapy(参见下面的代码)。如果没有安装在执行节点上,是否会将整个模块发送给它们?或者在这种情况下,任务不会被执行?还是以失败告终?有没有办法控制这种行为?
如果我的map函数访问任何全局对象会发生什么?这些物品是运给工人的吗?或者有某种错误/意外行为?
下面是一个示例代码:
# !/usr/bin/python
from pyspark import SparkContext, SparkConf
def ExtractIP(rawEther):
from scapy.layers.inet import Ether, IP
eth = Ether(rawEther)
# May not be IP (for example ARP)
try:
return eth[IP].fields['src']
except:
return '0.0.0.0'
def main():
# Init Spark
conf = SparkConf().setAppName("MyApp").setMaster("local")
sc = SparkContext(conf=conf)
# Load data
cap = sc.sequenceFile("hdfs://master/user/art/Data.seq")
# Get raw Ethernet message
raw_msgs = cap.values()
# Get the source IP address using Scapy
msg_ip = raw_msgs.map(ExtractIP)
# Print the number of target IP messages
print msg_ip.filter(lambda srcIp: srcIp == '10.1.1.100').count()
if __name__ == "__main__":
main()
1条答案
按热度按时间w51jfk4q1#
闭包中引用的所有变量都会自动传递到工作节点,但您必须处理依赖关系。
有多种处理方法:
安装依赖项/放置在
PYTHONPATH
在每个工作节点上使用
pyFiles
创建sparkcontext或addPyFile
现有方法上的方法使用
--py-files
的论点spark-submit
如果依赖关系很大或者需要一些外部库,那么第一种方法可能是最佳的。如果构建自己的模块,您可能更喜欢pyFiles
而不是解决方案。