我想应用sklearn.preprocessing的标签编码器功能,使用kafka和spark结构化流媒体对流媒体数据进行预处理。目前的想法是:
当我每次从kafka源接收一批数据时,我想在该批数据上实现一个函数,如下所示:
def use_label_encoder(label_encoder, y):
return label_encoder.transform(y) + 1
to_transform_class_val = udf(use_label_encoder, IntegerType())
以下是模式:
schema = StructType([
StructField("sepal_length_in_cm", StringType()), \
StructField("sepal_width_in_cm", StringType()), \
StructField("petal_length_in_cm", StringType()), \
StructField("petal_width_in_cm", StringType()), \
StructField("class", StringType())
])
df = df.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_json(df.value, schema).alias("json"))
当我尝试定义标签\u编码器时:
label_encoder = enc.fit(df1.select(to_upper("json.class")))
它给出了一个错误“错误的输入形状”
我用于非流数据的等效代码是:
y = df['class'].values
enc = LabelEncoder()
label_encoder = enc.fit(y)
y = label_encoder.transform(y) + 1
有谁能帮我把sklearn方法应用到流数据上吗?
1条答案
按热度按时间yebdmbv41#
你能稍后再加1吗?你的星火密码会变成