创建pyspark的spark上下文py4jjava网关对象

oyt4ldly  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(461)

我正在尝试将javaDataframe转换为pysparkDataframe。为此,我在java进程中创建一个dataframe(或行的数据集),并在java端启动一个py4j.gatewayserver服务器进程。然后在python端,我创建一个py4j.java_gateway.javagateway()客户机对象,并将其传递给pyspark的sparkcontext构造函数,将其链接到已经启动的jvm进程。但我得到了一个错误:-

File: "path_to_virtual_environment/lib/site-packages/pyspark/conf.py", line 120, in __init__
    self._jconf = _jvm.SparkConf(loadDefaults)
TypeError: 'JavaPackage' object is not callable

有人能帮忙吗?下面是我的密码using:-
javacode:-

import py4j.GatewayServer
public class TestJavaToPythonTransfer{
    Dataset<Row> df1;
    public TestJavaToPythonTransfer(){
        SparkSession spark = 
              SparkSession.builder().appName("test1").config("spark.master","local").getOrCreate();
        df1 = spark.read().json("path/to/local/json_file");
    }
    public Dataset<Row> getDf(){
        return df1;  
    }
    public static void main(String args[]){
       GatewayServer gatewayServer = new GatewayServer(new TestJavaToPythonTransfer());
       gatewayServer.start();
       System.out.println("Gateway server started");
    }
}

pythoncode:-

from pyspark.sql import SQLContext, DataFrame
from pyspark import SparkContext, SparkConf
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
conf = SparkConf().set('spark.io.encryption.enabled','true')
py_sc = SparkContext(gateway=gateway,conf=conf)
j_df = gateway.getDf()
py_df = DataFrame(j_df,SQLContext(py_sc))
print('print dataframe content')
print(dpy_df.collect())

运行python的命令code:-

python path_to_python_file.py

我也试过了this:-

$SPARK_HOME/bin/spark-submit --master local path_to_python_file.py

但在这里,虽然代码没有抛出任何错误,但它没有打印任何东西到终端。我需要为此设置一些spark conf吗?
p、 如果代码或错误中有错别字,请提前道歉,因为我无法直接从公司的ide复制代码和错误堆栈。

mm9b1k5b

mm9b1k5b1#

在调用getdf()之前,缺少对入口点的调用
所以,试试这个:

app = gateway.entry_point
j_df = app.getDf()

此外,我在下面使用python和scala创建了工作副本(希望您不介意),它显示了在scala端py4j网关是如何使用spark会话和示例dataframe启动的,在python端,我访问了该dataframe并在转换回python端spark会话的dataframe之前将其转换为python列表[tuple]:
Python:

from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField

if __name__ == '__main__':
    gateway = JavaGateway()

    spark_app = gateway.entry_point
    df = spark_app.df()

    # Note "apply" method here comes from Scala's companion object to access elements of an array
    df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df]

    spark = (SparkSession
             .builder
             .appName("My PySpark App")
             .getOrCreate())

    schema = StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True)])

    df = spark.createDataFrame(df_to_list_tuple, schema)

    df.show()

斯卡拉:

import java.nio.file.{Path, Paths}

import org.apache.spark.sql.SparkSession
import py4j.GatewayServer

object SparkApp {
  val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("My app")
    .getOrCreate()

  val df = spark
      .read
      .option("header", "True")
      .csv(myFile.toString)
      .collect()

}

object Py4JServerApp extends App {

  val server = new GatewayServer(SparkApp)
  server.start()

  print("Started and running...")
}

相关问题