如何将python程序转换为pyspark

ecbunoof  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(364)
import json
import requests

from log_conf import Logger
from utils import ES_AUTH, ES_HOST, BASEPATH_FEATURES, FEATURE_SET_NAME, INDEX_NAME
from urllib.parse import urljoin

def get_f(feature_id: int):
    #Each feature is stored in a file with the name of the file as feature_number.json: 1.json, 2.json. This

    return json.loads(open(BASEPATH_FEATURES + '%s.json' % feature_id).read())

def each_f():

    try:
        feature_id = 1
        while True:
            parsed_json = get_f(feature_id)
            template = parsed_json['query']

            try:
                feature_id = list(parsed_json['query']['match'].keys())[0] +'_score'
            except:
                pass

            try:
                feature_id = list(parsed_json['query']['match_phrase'].keys())[0] + '_phrase_score'
            except:
                pass

            try:
                feature_id = parsed_json['query']['match_explorer']['type'] + '_' + \
                             list(parsed_json['query']['match_explorer']['query']['match'])[0] + '_score'
            except:
                pass

            feature_spec = {
                #"name": "%s" % feature_id,
                "name": "%s" % feature_id,
                "params": ["keywords"],
                "template": template
            }
            yield feature_spec
            feature_id += 1
    except IOError:
        pass

def load_f(feature_set_name: str):

    feature_set = {
        "validation": {
            "params": {
                "keywords": "some value"
            },
            "index": INDEX_NAME
        },
        "featureset": {
            "name": feature_set_name,
            "features": [feature for feature in each_f()]

        }
    }

    path = "_mtr/_featureset/%s" % feature_set_name

    full_path = urljoin(ES_HOST, path)
    Logger.logger.info("POST %s" % full_path)
    Logger.logger.info(json.dumps(feature_set, indent=2))
    head = {'Content-Type': 'application/json'}
    resp = requests.post(full_path, data=json.dumps(feature_set), headers=head, auth=ES_AUTH, verify=False)
    Logger.logger.info("%s" % resp.status_code)
    Logger.logger.info("%s" % resp.text)

def init_default_store():

    """ Initialize the default feature store. """
    path = urljoin(ES_HOST, '_mtr')
    Logger.logger.info("DELETE %s" % path)
    resp = requests.delete(path, auth=ES_AUTH, verify=False)
    Logger.logger.info("%s" % resp.status_code)
    Logger.logger.info("PUT %s" % path)
    resp = requests.put(path, auth=ES_AUTH, verify=False)
    Logger.logger.info("%s" % resp.status_code)

if __name__ == "__main__":
    from time import sleep

    init_default_store()
    load_f(FEATURE_SET_NAME)

我是pyspark新手,希望将上面的python代码转换为pyspark。我有以下问题:
有可能转换吗?
如果是,我们怎么做?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题