import json
import requests
from log_conf import Logger
from utils import ES_AUTH, ES_HOST, BASEPATH_FEATURES, FEATURE_SET_NAME, INDEX_NAME
from urllib.parse import urljoin
def get_f(feature_id: int):
#Each feature is stored in a file with the name of the file as feature_number.json: 1.json, 2.json. This
return json.loads(open(BASEPATH_FEATURES + '%s.json' % feature_id).read())
def each_f():
try:
feature_id = 1
while True:
parsed_json = get_f(feature_id)
template = parsed_json['query']
try:
feature_id = list(parsed_json['query']['match'].keys())[0] +'_score'
except:
pass
try:
feature_id = list(parsed_json['query']['match_phrase'].keys())[0] + '_phrase_score'
except:
pass
try:
feature_id = parsed_json['query']['match_explorer']['type'] + '_' + \
list(parsed_json['query']['match_explorer']['query']['match'])[0] + '_score'
except:
pass
feature_spec = {
#"name": "%s" % feature_id,
"name": "%s" % feature_id,
"params": ["keywords"],
"template": template
}
yield feature_spec
feature_id += 1
except IOError:
pass
def load_f(feature_set_name: str):
feature_set = {
"validation": {
"params": {
"keywords": "some value"
},
"index": INDEX_NAME
},
"featureset": {
"name": feature_set_name,
"features": [feature for feature in each_f()]
}
}
path = "_mtr/_featureset/%s" % feature_set_name
full_path = urljoin(ES_HOST, path)
Logger.logger.info("POST %s" % full_path)
Logger.logger.info(json.dumps(feature_set, indent=2))
head = {'Content-Type': 'application/json'}
resp = requests.post(full_path, data=json.dumps(feature_set), headers=head, auth=ES_AUTH, verify=False)
Logger.logger.info("%s" % resp.status_code)
Logger.logger.info("%s" % resp.text)
def init_default_store():
""" Initialize the default feature store. """
path = urljoin(ES_HOST, '_mtr')
Logger.logger.info("DELETE %s" % path)
resp = requests.delete(path, auth=ES_AUTH, verify=False)
Logger.logger.info("%s" % resp.status_code)
Logger.logger.info("PUT %s" % path)
resp = requests.put(path, auth=ES_AUTH, verify=False)
Logger.logger.info("%s" % resp.status_code)
if __name__ == "__main__":
from time import sleep
init_default_store()
load_f(FEATURE_SET_NAME)
我是pyspark新手,希望将上面的python代码转换为pyspark。我有以下问题:
有可能转换吗?
如果是,我们怎么做?
暂无答案!
目前还没有任何答案,快来回答吧!