如何创建一个udf来接收一行并在c中复制它#

ecfsfe2w  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(188)

我需要复制Dataframe中的行,以便将采样时间增加到1秒。
我尝试了两种方法:收集dataframe中的数据并将重复的行添加到列表中,然后将其与原始数据框合并:

private DataFrame DuplicateRowsEverySecond(DataFrame dataFrame)
    {

        var GRA = new List<GenericRow>();
        Timestamp timestamp;
        double meanCurrentRMS;
        int subjectKey;
        int? duration;

        object[] array;

        var schema = dataFrame.Schema();

        foreach (var item in dataFrame.Collect())
        {
            duration = item.GetAs<int?>("duration");
            if (duration == 1)
            {
                continue;
            }

            timestamp = item.GetAs<Timestamp>("Timestamp");
            meanCurrentRMS = item.GetAs<double>("MeanCurrentRms");
            subjectKey = item.GetAs<int>("SubjectKey");

            for (int i = 0; i < duration - 1; i++)
            {
                timestamp = new Timestamp(timestamp.ToDateTime().AddSeconds(1));
                array = new object[] { timestamp, meanCurrentRMS, subjectKey, duration };
                GRA.Add(new GenericRow(array));
            }

        }

        var newDF = this.session.CreateDataFrame(GRA, schema);
        DataFrame retVal = dataFrame.Union(newDF);
        return retVal;
    }

据我所知,这个方法在几行之后遇到了apachespark错误:未处理的异常。system.invalidcastexception:无法将“microsoft.spark.sql.types.timestamp”类型的对象强制转换为“system.int32”类型。
我想保留这段代码,并以某种方式修复错误。
我尝试实现的另一个解决方案是使用udf。我需要自定义项来接收一行并返回一列行,我似乎找不到正确的语法

private DataFrame DuplicateRowsEverySecond(DataFrame dataFrame)
    {
        Func<Row, List<Row>> udfFunc = Udf<Row, List<Row>>((item) =>
        {
            Timestamp timestamp = item.GetAs<Timestamp>("Timestamp"); ;
            double meanCurrentRMS = item.GetAs<double>("MeanCurrentRms"); ;
            int subjectKey = item.GetAs<int>("SubjectKey"); ;
            var duration = item.GetAs<int?>("duration");

            List<Row> retVal = new List<Row> { item };

            if (duration == 1)
            {
                return retVal;
            }

            for (int i = 0; i < duration - 1; i++)
            {
                timestamp = new Timestamp(timestamp.ToDateTime().AddSeconds(1));
                Console.WriteLine("************timestamp=" + timestamp);
                retVal.Add(new GenericRow(new object[] { timestamp, meanCurrentRMS, subjectKey, duration }));
                //Console.WriteLine((new GenericRow(array)).ToString());
            }

            return retVal;
        });

        var GRA = new List<GenericRow>();
        GRA.Add(udfFunc(dataFrame));
        var newDF = this.session.CreateDataFrame(GRA, schema);
        DataFrame retVal = dataFrame.Union(newDF);
        return retVal;
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题