pyspark 如何为Spark Connect配置身份验证?

0yycz8jy  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(131)

我最近在Spark 3.4中发现了Spark Connect新功能。我在YARN集群上测试了它,它运行得很好!但是,任何人都可以访问我的Spark Connect服务器并使用我的用户运行作业。**是否可以为Spark Connect配置身份验证?**简单的密码,或用户名和密码的组合。
我搜索类似的东西:

  • 服务器端
./sbin/start-connect-server.sh \
--jars jars/spark-connect_2.12-3.4.1.jar \
--master yarn --name SparkConnectTest \
--conf spark.sql.catalogImplementation=hive \
--conf password=mysuperpassword

字符串

  • 客户端
spark = SparkSession.builder.remote("sc://localhost").conf("password", "mysuperpassword").getOrCreate()


官方文件说:
虽然Spark Connect没有内置的身份验证,但它可以与现有的身份验证基础设施无缝协作。它的gRPC HTTP/2接口允许使用身份验证代理,这使得不必直接在Spark中实现身份验证逻辑就可以保护Spark Connect。
但是我不知道如何使用Spark Connect配置gRPC。

  • Python 3.7.6
  • PySpark 3.4.1
  • Hadoop 3.1.1
zqdjd7g9

zqdjd7g91#

gRPC代理

删除gRPC流量:gRPC客户端-> APISIX -> gRPC/gRPCS服务器
https://apisix.apache.org/docs/apisix/grpc-proxy/
也许能帮到你
--2023-11-02更新-
我使用一个简单的gRPC示例,并使用apisix代理gRPC。

package user; 

service UserService{
  rpc getUserInfo(UserRequest) returns (UserResponse);
}

message UserRequest{
  string id = 1;
}

message UserResponse{
  string id = 1;
  int32 phoneNumber = 2; 
  string email = 3; 
  int32 serialNumber = 4; 
}

字符串
使用apisix管理API添加路由和身份验证。

# add route
curl http://127.0.0.1:30918/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "methods": ["POST", "GET"],
    "uri": "/user.UserService/getUserInfo",
    "upstream": {
        "scheme": "grpc",
        "type": "roundrobin",
        "nodes": {
            "172.28.208.1:5001": 1
        }
    }
}'

# add key-auth info
curl -i "http://127.0.0.1:30918/apisix/admin/consumers" -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
  "username": "tom",
  "plugins": {
    "key-auth": {
      "key": "secret-key"
    }
  }
}'

# add key-auth plugin to route
curl -i "http://127.0.0.1:30918/apisix/admin/routes/1" -X PATCH -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -d '
{
  "plugins": {
    "key-auth": {}
  }
}'


当我使用没有apikey或错误值的客户端调用服务时:

Please input user id: 1
Please input user id: io.grpc.StatusRuntimeException: UNAUTHENTICATED: HTTP status code 401
invalid content-type: text/plain; charset=utf-8
headers: Metadata(:status=401,date=Wed, 01 Nov 2023 03:39:18 GMT,content-type=text/plain; charset=utf-8,server=APISIX/3.6.0)
DATA-----------------------------
{"message":"Missing API key found in request"}

    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:222)
    at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:203)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:132)
    at com.meritdata.grpc.proto.UserServiceGrpc$UserServiceBlockingStub.getUserInfo(UserServiceGrpc.java:358)
    at com.meritdata.grpc.client.ClientApp.getUserInfo(ClientApp.java:46)
    at com.meritdata.grpc.client.ClientApp.main(ClientApp.java:23)


spark connect gRPC代理即将推出。

  • 最后更新-
    添加Spark连接路由:
curl http://127.0.0.1:30918/apisix/admin/routes/spark-connect -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "methods": ["POST", "GET"],
    "uri": "/spark.connect.SparkConnectService/*",
    "upstream": {
        "scheme": "grpc",
        "type": "roundrobin",
        "nodes": {
            "10.43.105.147:15002": 1
        }
    },
    "plugins": {
        "key-auth": {}
    }
}'


在python代码中使用pyspark:

from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://172.28.220.144:30981/;apikey=secret-key").getOrCreate()
...
spark.stop()


如果未提供apiKey或在连接过程中使用了不正确的值,则会报告错误。

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.remote("sc://172.28.220.144:30981/;apikey=secret-key1").getOrCreate()
E1102 14:17:30.511000000 16764 src/core/ext/transport/chttp2/transport/hpack_parser.cc:999] Error parsing 'content-type' metadata: invalid value
...\Miniconda3\envs\python310\lib\site-packages\pyspark\sql\connect\session.py:185: UserWarning: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Stream removed", grpc_status:2, created_time:"2023-11-02T06:17:30.5118727+00:00"}"
>
  warnings.warn(str(e))
E1102 14:17:30.546000000 16764 src/core/ext/transport/chttp2/transport/hpack_parser.cc:999] Error parsing 'content-type' metadata: invalid value
...Miniconda3\envs\python310\lib\site-packages\pyspark\sql\connect\session.py:185: UserWarning: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2023-11-02T06:17:30.5470989+00:00", grpc_status:2, grpc_message:"Stream removed"}"
>
  warnings.warn(str(e))


但是错误消息并不具体,您可以使用Wireshark工具来检查返回值。

Internet Protocol Version 4, Src: 172.28.220.144, Dst: 172.28.208.1
Transmission Control Protocol, Src Port: 30981, Dst Port: 59795, Seq: 207, Ack: 837, Len: 77
HyperText Transfer Protocol 2
    Stream: HEADERS, Stream ID: 3, Length 68, 401 Unauthorized
        Length: 68
        Type: HEADERS (1)
        Flags: 0x04, End Headers
        0... .... .... .... .... .... .... .... = Reserved: 0x0
        .000 0000 0000 0000 0000 0000 0000 0011 = Stream Identifier: 3
        [Pad Length: 0]
        Header Block Fragment: 48033430316197df3dbf4a004a693f75040132a01ab8d3b7196d4c5a37ff5f92497ca58a…
        [Header Length: 130]
        [Header Count: 4]
        Header: :status: 401 Unauthorized
        Header: date: Thu, 02 Nov 2023 04:47:35 GMT
        Header: content-type: text/plain; charset=utf-8
        Header: server: APISIX/3.6.0

相关问题