我需要复制Dataframe中的行,以便将采样时间增加到1秒。
我尝试了两种方法:收集dataframe中的数据并将重复的行添加到列表中,然后将其与原始数据框合并:
private DataFrame DuplicateRowsEverySecond(DataFrame dataFrame)
{
var GRA = new List<GenericRow>();
Timestamp timestamp;
double meanCurrentRMS;
int subjectKey;
int? duration;
object[] array;
var schema = dataFrame.Schema();
foreach (var item in dataFrame.Collect())
{
duration = item.GetAs<int?>("duration");
if (duration == 1)
{
continue;
}
timestamp = item.GetAs<Timestamp>("Timestamp");
meanCurrentRMS = item.GetAs<double>("MeanCurrentRms");
subjectKey = item.GetAs<int>("SubjectKey");
for (int i = 0; i < duration - 1; i++)
{
timestamp = new Timestamp(timestamp.ToDateTime().AddSeconds(1));
array = new object[] { timestamp, meanCurrentRMS, subjectKey, duration };
GRA.Add(new GenericRow(array));
}
}
var newDF = this.session.CreateDataFrame(GRA, schema);
DataFrame retVal = dataFrame.Union(newDF);
return retVal;
}
据我所知,这个方法在几行之后遇到了apachespark错误:未处理的异常。system.invalidcastexception:无法将“microsoft.spark.sql.types.timestamp”类型的对象强制转换为“system.int32”类型。
我想保留这段代码,并以某种方式修复错误。
我尝试实现的另一个解决方案是使用udf。我需要自定义项来接收一行并返回一列行,我似乎找不到正确的语法
private DataFrame DuplicateRowsEverySecond(DataFrame dataFrame)
{
Func<Row, List<Row>> udfFunc = Udf<Row, List<Row>>((item) =>
{
Timestamp timestamp = item.GetAs<Timestamp>("Timestamp"); ;
double meanCurrentRMS = item.GetAs<double>("MeanCurrentRms"); ;
int subjectKey = item.GetAs<int>("SubjectKey"); ;
var duration = item.GetAs<int?>("duration");
List<Row> retVal = new List<Row> { item };
if (duration == 1)
{
return retVal;
}
for (int i = 0; i < duration - 1; i++)
{
timestamp = new Timestamp(timestamp.ToDateTime().AddSeconds(1));
Console.WriteLine("************timestamp=" + timestamp);
retVal.Add(new GenericRow(new object[] { timestamp, meanCurrentRMS, subjectKey, duration }));
//Console.WriteLine((new GenericRow(array)).ToString());
}
return retVal;
});
var GRA = new List<GenericRow>();
GRA.Add(udfFunc(dataFrame));
var newDF = this.session.CreateDataFrame(GRA, schema);
DataFrame retVal = dataFrame.Union(newDF);
return retVal;
}
暂无答案!
目前还没有任何答案,快来回答吧!