我在这个奇怪的Pyspark行为破解我的头,在互联网上找不到任何东西。
我们在私有网络中有一个MySQL集群,我们可以通过SSH隧道访问它。我试图使用Pyspark和SSHTunnelForwarder从这个数据库读取数据,我是这样做的:
1.创建隧道:
server = SSHTunnelForwarder(
(target_tunnel_ip_address, 22),
ssh_username=tunnel_username",
ssh_private_key=private_key_filepath",
remote_bind_address=(mysql_address, 3306)
)
server.start()
字符串
1.使用数据库信息创建JDBC URL,如下所示:
hostname = "localhost" #Because I forwarded I forwarded the remote port to my localhost
port = server.local_bind_port #To access which port the SSHTunnelForwarder used
username = my_username
password = my_password
database = my_database
jdbcUrl = "jdbc:mysql://{}:{}/{}?user={}&password={}".format(hostname, port, database, username, password)
型
1.从数据库中阅读数据:
data = spark.read \
.format("jdbc") \
.option("url", jdbcUrl) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("query", query) \
.load()
型
到目前为止一切顺利,这似乎工作,我可以看到表列:[变量数据输出][1] [1]:https://i.stack.imgur.com/YJhCC.png
DataFrame[id: int, company_id: int, life_id: int, type_id: int, cep: string, address: string, number: int, complement: string, neighborhood: string, city: string, state: string, origin: string, created_at: timestamp, updated_at: timestamp, active: boolean]
型
但是一旦我调用任何实际读取数据的方法,比如.head(),.collect()或其他变体,我就会得到这个错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7629.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7629.0 (TID 11996, XX.XXX.XXX.XXX, executor 0): com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
型
有人知道为什么会发生这种情况以及如何解决它吗?
1条答案
按热度按时间dphi5xsq1#
该代码在驱动程序中执行,但任务在执行器上运行,当您引用“localhost”时,每个执行器将自行解释并无法连接。
而是获取驱动程序的主机名(例如socket.gethostname())并在JDBC URL中使用它