我是一个长期的C#程序员,但刚刚开始接触Apache Spark的.Net。根据许多“入门”说明和视频,我安装了:
- 7-Zip
- Java 8
- 我从https://spark.apache.org/downloads.html下载了Apache Spark
- .NET for Apache Spark v2.1.1
- WinUtils.exe我正在Windows 10上运行此程序
**问题:**使用自定义项做DataFrame.WithColumn()后,调用DataFrame.Show()时,总是得到错误:[2023-02- 07 T15:45:31.3903664Z] [DESKTOP-H37 P8 Q 0] [错误] [TaskRunner] [0] ProcessStream()失败,出现异常:System.ArgumentNullException:值不能为空。参数名称:类型
TestCases.csv看起来像这样:
+----+----+----+----+
|name|posX|posY|rate|
+----+----+----+----+
| A| 100| -20| 20|
| B| 400| 30| 45|
| C| -10| 75| 61|
| D| 100| 120| 76|
| E| 48| 96| 88|
| F| 808| 46| 55|
| G|1200| 91| 99|
+----+----+----+----+
csv看起来像这样,但它有5040行:
+-------------+
| names|
+-------------+
|A|B|C|D|E|F|G|
|A|B|C|D|E|G|F|
|A|B|C|D|F|E|G|
|A|B|C|D|F|G|E|
|A|B|C|D|G|E|F|
|A|B|C|D|G|F|E|
|A|B|C|E|D|F|G|
|A|B|C|E|D|G|F|
|A|B|C|E|F|D|G|
|A|B|C|E|F|G|D|
|A|B|C|E|G|D|F|
|A|B|C|E|G|F|D|
|A|B|C|F|D|E|G|
|A|B|C|F|D|G|E|
|A|B|C|F|E|D|G|
|A|B|C|F|E|G|D|
|A|B|C|F|G|D|E|
|A|B|C|F|G|E|D|
|A|B|C|G|D|E|F|
|A|B|C|G|D|F|E|
+-------------+
下面是Main()函数:
static void Main(string[] args)
{
// Create Spark session
SparkSession spark =
SparkSession
.Builder()
.AppName("test_Spark_app")
.GetOrCreate();
// We don't want to see all those INFO messages
spark.SparkContext.SetLogLevel("WARN");
// Create initial DataFrame
DataFrame dataFrame = spark
.Read()
.Schema("name STRING, posX INT, posY INT, rate INT")
.Csv(@"C:\TestSparkApp\bin\Debug\net6.0\TestCases.csv");
// print out the data
dataFrame.Show();
GetOrders();
// Create orders DataFrame
DataFrame ordersFrame = spark
.Read()
.Schema("names STRING")
.Csv(@"C:\TestSparkApp\bin\Debug\net6.0\OrderList.csv");
// print out the data
ordersFrame.Show();
// add all the players to each row
string playersAsString = Collector.PlayersToString(_players);
ordersFrame = ordersFrame.WithColumn("players", Lit(playersAsString));
// print out the data
ordersFrame.Show();
// user defined function
Func<Column, Column, Column> GetSubst = Udf<string, string, int>(
(strOrder, strPlayers) =>
{
return GetSubstance(strOrder, strPlayers);
});
// call the user defined function and add a new column to the dataframe
ordersFrame = ordersFrame.WithColumn("substance", GetSubst(ordersFrame["names"], ordersFrame["players"]).Cast("Integer"));
// if I comment out the following, it does NOT produce the error:
// print out the data
ordersFrame.Show(20,20,false);
// Stop Spark session
spark.Stop();
}
下面是UDF函数:
public static int GetSubstance(string strOrder, string strPlayers)
{
// to simplify things, we are just returning zero
return 0;
}
下面是输出:
> C:\TestSparkApp>spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local bin\Debug\net6.0\microsoft-spark-2-4_2.11-2.1.1.jar dotnet bin\Debug\net6.0\TestSparkApp.dll
> 23/02/07 10:45:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> [2023-02-07T15:45:18.5377868Z] [DESKTOP-H37P8Q0] [Info] [ConfigurationService] Using port 50256 for connection.
> [2023-02-07T15:45:18.5490854Z] [DESKTOP-H37P8Q0] [Info] [JvmBridge] JvMBridge port is 50256
> [2023-02-07T15:45:18.5529043Z] [DESKTOP-H37P8Q0] [Info] [JvmBridge] The number of JVM backend thread is set to 10. The max number of concurrent sockets in JvmBridge is set to 7.
> +----+----+----+----+
> |name|posX|posY|rate|
> +----+----+----+----+
> A| 100| -20| 20|
> B| 400| 30| 45|
> C| -10| 75| 61|
> D| 100| 120| 76|
> E| 48| 96| 88|
> F| 808| 46| 55|
> G|1200| 91| 99|
> +----+----+----+----+
>
> +-------------+
> names|
> +-------------+
> |A|B|C|D|E|F|G|
> |A|B|C|D|E|G|F|
> |A|B|C|D|F|E|G|
> |A|B|C|D|F|G|E|
> |A|B|C|D|G|E|F|
> |A|B|C|D|G|F|E|
> |A|B|C|E|D|F|G|
> |A|B|C|E|D|G|F|
> |A|B|C|E|F|D|G|
> |A|B|C|E|F|G|D|
> |A|B|C|E|G|D|F|
> |A|B|C|E|G|F|D|
> |A|B|C|F|D|E|G|
> |A|B|C|F|D|G|E|
> |A|B|C|F|E|D|G|
> |A|B|C|F|E|G|D|
> |A|B|C|F|G|D|E|
> |A|B|C|F|G|E|D|
> |A|B|C|G|D|E|F|
> |A|B|C|G|D|F|E|
> +-------------+
> only showing top 20 rows
>
> +-------------+--------------------+
> names| players|
> +-------------+--------------------+
> |A|B|C|D|E|F|G|A,100,-20,20|B,40...|
> |A|B|C|D|E|G|F|A,100,-20,20|B,40...|
> |A|B|C|D|F|E|G|A,100,-20,20|B,40...|
> |A|B|C|D|F|G|E|A,100,-20,20|B,40...|
> |A|B|C|D|G|E|F|A,100,-20,20|B,40...|
> |A|B|C|D|G|F|E|A,100,-20,20|B,40...|
> |A|B|C|E|D|F|G|A,100,-20,20|B,40...|
> |A|B|C|E|D|G|F|A,100,-20,20|B,40...|
> |A|B|C|E|F|D|G|A,100,-20,20|B,40...|
> |A|B|C|E|F|G|D|A,100,-20,20|B,40...|
> |A|B|C|E|G|D|F|A,100,-20,20|B,40...|
> |A|B|C|E|G|F|D|A,100,-20,20|B,40...|
> |A|B|C|F|D|E|G|A,100,-20,20|B,40...|
> |A|B|C|F|D|G|E|A,100,-20,20|B,40...|
> |A|B|C|F|E|D|G|A,100,-20,20|B,40...|
> |A|B|C|F|E|G|D|A,100,-20,20|B,40...|
> |A|B|C|F|G|D|E|A,100,-20,20|B,40...|
> |A|B|C|F|G|E|D|A,100,-20,20|B,40...|
> |A|B|C|G|D|E|F|A,100,-20,20|B,40...|
> |A|B|C|G|D|F|E|A,100,-20,20|B,40...|
> +-------------+--------------------+
> only showing top 20 rows
>
> [2023-02-07T15:45:30.2938453Z] [DESKTOP-H37P8Q0] [Debug] [ConfigurationService] Using the DOTNET_WORKER_DIR environment variable to construct .NET worker path: C:\bin\Microsoft.Spark.Worker-2.1.1\Microsoft.Spark.Worker.exe.
> DotnetWorker PID:[3636] Args:[-m pyspark.worker] SparkVersion:[2.4.5]
> [2023-02-07T15:45:31.0778526Z] [DESKTOP-H37P8Q0] [Info] [SimpleWorker] RunSimpleWorker() is starting with port = 50281.
> [2023-02-07T15:45:31.1251548Z] [DESKTOP-H37P8Q0] [Info] [TaskRunner] [0] Starting with ReuseSocket[False].
> [2023-02-07T15:45:31.1560166Z] [DESKTOP-H37P8Q0] [Info] [ConfigurationService] 'DOTNETBACKEND_PORT' environment variable is not set.
> [2023-02-07T15:45:31.1560166Z] [DESKTOP-H37P8Q0] [Info] [ConfigurationService] Using port 5567 for connection.
> [2023-02-07T15:45:31.1719795Z] [DESKTOP-H37P8Q0] [Info] [JvmBridge] JvMBridge port is 5567
> [2023-02-07T15:45:31.1719795Z] [DESKTOP-H37P8Q0] [Info] [JvmBridge] The number of JVM backend thread is set to 10. The max number of concurrent sockets in JvmBridge is set to 7.
> [2023-02-07T15:45:31.2810367Z] [DESKTOP-H37P8Q0] [Warn] [AssemblyLoader] Assembly 'System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e' file not found 'System.Private.CoreLib[.dll,.exe,.ni.dll,.ni.exe]' in 'C:\TestSparkApp\bin\Debug\net6.0,C:\Users\dtaylor\AppData\Local\Temp\spark-70f58a5a-d0d9-4cc7-b6c1-5d17da493edf\userFiles-432f5bf3-f3c2-4afa-a6ba-d3a6ce4a9e4e,C:\TestSparkApp,C:\bin\Microsoft.Spark.Worker-2.1.1\'
> [2023-02-07T15:45:31.2810367Z] [DESKTOP-H37P8Q0] [Warn] [AssemblyLoader] Assembly 'System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e' file not found 'System.Private.CoreLib[.dll,.exe,.ni.dll,.ni.exe]' in 'C:\TestSparkApp\bin\Debug\net6.0,C:\Users\dtaylor\AppData\Local\Temp\spark-70f58a5a-d0d9-4cc7-b6c1-5d17da493edf\userFiles-432f5bf3-f3c2-4afa-a6ba-d3a6ce4a9e4e,C:\TestSparkApp,C:\bin\Microsoft.Spark.Worker-2.1.1\'
> [2023-02-07T15:45:31.3903664Z] [DESKTOP-H37P8Q0] [Error] [TaskRunner] [0] ProcessStream() failed with exception: System.ArgumentNullException: Value cannot be null.
> Parameter name: type
> at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture, Object[] activationAttributes)
> at System.Activator.CreateInstance(Type type, BindingFlags bindingAttr, Binder binder, Object[] args, CultureInfo culture)
> at Microsoft.Spark.Utils.CommandSerDe.CreateUdfWrapperDelegate[T](Type type, Object[] parameters)
> at Microsoft.Spark.Utils.CommandSerDe.Deserialize[T](Stream stream, SerializedMode& serializerMode, SerializedMode& deserializerMode, String& runMode)
> at Microsoft.Spark.Worker.Processor.CommandProcessor.ReadSqlCommands(PythonEvalType evalType, Stream stream)
> at Microsoft.Spark.Worker.Processor.CommandProcessor.ReadSqlCommands(PythonEvalType evalType, Stream stream, Version version)
> at Microsoft.Spark.Worker.Processor.CommandProcessor.Process(Stream stream)
> at Microsoft.Spark.Worker.Processor.PayloadProcessor.Process(Stream stream)
> at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete)
> removed for brevity
如果我注解掉Show命令,则不会抛出错误。错误消息指定名为“type”的参数为null,并且此参数用于名为ProcessStream()的函数。看起来错误来自TaskRunner。
编辑:
事实证明,这个错误在许多DataFrame方法调用中都会弹出,但只有在使用WithColumn()添加列之后才会弹出。
其他人有这个问题吗?
编辑:
我已经一个星期没有收到任何回复了。我是否应该认为这意味着Apache Spark的.NET不再是一个可行的产品?我应该放弃它吗?
1条答案
按热度按时间mzmfm0qo1#
根据Spark .Net的文档,提供给'Cast'方法的类型名称应该以小写字符开头,在您的情况下,您可以尝试'int'而不是'Integer'。
https://learn.microsoft.com/en-us/dotnet/api/microsoft.spark.sql.column.cast?view=spark-dotnet