我编写了一个包含两个函数的小脚本,一个生成随机日志并提供给kafka生产者,另一个使用kafka主题在spark streaming中创建数据流。
我希望这两个函数使用python多处理并发工作,不幸的是,当我运行脚本时,我得到一个与kafkautils.createstream相关的错误。。。
以下是我的终端显示的内容:
~/Desktop/spark_test/kafka_sparkStream/python spark-submit --jars spark-streaming-kafka-assembly_2.10-1.3.0.jar randomLogGenerator.py
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/04/13 11:50:49 INFO SparkContext: Running Spark version 1.3.0
15/04/13 11:50:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/04/13 11:50:49 INFO SecurityManager: Changing view acls to: Mandok
15/04/13 11:50:49 INFO SecurityManager: Changing modify acls to: Mandok
15/04/13 11:50:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Mandok); users with modify permissions: Set(Mandok)
15/04/13 11:50:50 INFO Slf4jLogger: Slf4jLogger started
15/04/13 11:50:50 INFO Remoting: Starting remoting
15/04/13 11:50:50 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.235.249.202:52867]
15/04/13 11:50:50 INFO Utils: Successfully started service 'sparkDriver' on port 52867.
15/04/13 11:50:50 INFO SparkEnv: Registering MapOutputTracker
15/04/13 11:50:50 INFO SparkEnv: Registering BlockManagerMaster
15/04/13 11:50:50 INFO DiskBlockManager: Created local directory at /var/folders/n9/3b4rd9wx0v957x03v6h41xnw0000gn/T/spark-212d4f02-8166-4dec-bec3-f3f618ab03bf/blockmgr-7c57842e-99ae-47ac-b408-8587b573f8f5
15/04/13 11:50:50 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/04/13 11:50:50 INFO HttpFileServer: HTTP File server directory is /var/folders/n9/3b4rd9wx0v957x03v6h41xnw0000gn/T/spark-4b2527ac-a253-455f-94f2-eef96397426f/httpd-f83cd6dc-0a9d-4bf4-94ca-cad9ab611191
15/04/13 11:50:50 INFO HttpServer: Starting HTTP Server
15/04/13 11:50:50 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 11:50:50 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52868
15/04/13 11:50:50 INFO Utils: Successfully started service 'HTTP file server' on port 52868.
15/04/13 11:50:50 INFO SparkEnv: Registering OutputCommitCoordinator
15/04/13 11:50:50 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 11:50:50 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/04/13 11:50:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/04/13 11:50:50 INFO SparkUI: Started SparkUI at http://10.235.249.202:4040
15/04/13 11:50:51 INFO SparkContext: Added JAR file:/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/spark-streaming-kafka-assembly_2.10-1.3.0.jar at http://10.235.249.202:52868/jars/spark-streaming-kafka-assembly_2.10-1.3.0.jar with timestamp 1428918651275
15/04/13 11:50:51 INFO Utils: Copying /Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py to /var/folders/n9/3b4rd9wx0v957x03v6h41xnw0000gn/T/spark-46278e87-f4df-42b2-a666-449ab3c52b24/userFiles-e05fb779-60ee-4cd8-aa6b-1e7bcf51b14e/randomLogGenerator.py
15/04/13 11:50:51 INFO SparkContext: Added file file:/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py at file:/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py with timestamp 1428918651404
15/04/13 11:50:51 INFO Executor: Starting executor ID <driver> on host localhost
15/04/13 11:50:51 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.235.249.202:52867/user/HeartbeatReceiver
15/04/13 11:50:51 INFO NettyBlockTransferService: Server created on 52869
15/04/13 11:50:51 INFO BlockManagerMaster: Trying to register BlockManager
15/04/13 11:50:51 INFO BlockManagerMasterActor: Registering block manager localhost:52869 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 52869)
15/04/13 11:50:51 INFO BlockManagerMaster: Registered BlockManager
HEY I AM HERE
HEY I AM HERE
Process Process-2:
Traceback (most recent call last):
File "/Users/Mandok/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/Users/Mandok/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args,**self._kwargs)
File "/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py", line 56, in sparkStream
messages = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
File "/usr/local/Cellar/apache-spark/1.3.0/libexec/python/pyspark/streaming/kafka.py", line 69, in createStream
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
File "/usr/local/Cellar/apache-spark/1.3.0/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
self.target_id, self.name)
File "/usr/local/Cellar/apache-spark/1.3.0/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 314, in get_return_value
return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
KeyError: u'o'
Sent 1000 messages
Sent 2000 messages
Sent 3000 messages
Sent 4000 messages
Sent 5000 messages
我的两个职能:
def randomLog():
i = 0
while True:
timestamp = str(datetime.now())
publisher = random.choice(Publishers)
advertiser = random.choice(Advertisers)
website = "website_" + str(random.randint(0, 10000)) + ".com"
cookie = "cookie_" + str(random.randint(0, 10000))
geo = random.choice(Geos)
bid = random.random()
log = '{0}, {1}, {2}, {3}, {4}, {5}, {6}'.format(
timestamp, str(publisher), str(advertiser), website, cookie, geo, str(bid))
producer.send_messages("adnetwork_topic", log)
i += 1
if i % 1000 == 0:
print "Sent %d messages" % i
def sparkStream():
messages = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
lines = messages.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(", ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
process_randomLog = Process(target=randomLog)
process_sparkStream = Process(target=sparkStream)
process_randomLog.start()
process_sparkStream.start()
process_sparkStream.join()
process_randomLog.join()
谢谢你的帮助!
1条答案
按热度按时间vc9ivgsu1#
我通过在sparkstreaming函数中添加流上下文解决了这个问题。