我有一个flink程序,它使用自定义生成器生成数据,同时,我想将数据保存到sqlite3数据库中。一种方法是在map函数中将数据解析为所需的格式,然后从map操作中插入数据。但我做不到。我用来将数据写入sqlite3的代码如下所示,我并没有提及整个代码,所以数据库连接也在代码的初始部分,下面不包括
DataStream<Event> merged = stream1.union(stream2,stream3);
merged.print();
// sending data to Timekeeper via socket
merged.map(new MapFunction<Event, String>() {
@Override
public String map(Event event) throws Exception {
String tuple = event.toString();
Integer patient_id = event.getPatinet_id();
Integer sensor_id = event.getSensor_id();
Integer uid = event.getUid();
Long time = event.getTime();
Integer value = event.getValue();
String sql1 = "INSERT into mobile_events ( patientid , sensorid , uid , eatg ,valuez ) VALUES (" +
+ patient_id + ","
+ sensor_id + ","
+ uid + ","
+ time + ","
+ value
+ ");" ;
stmt.executeUpdate(sql1);
return tuple + "\n";
}
});
问题是
Error:(124, 22) java: local variables referenced from an inner class must be final or effectively final
如果我让它成为最终的,那么我会得到以下错误
Caused by: java.io.NotSerializableException: org.sqlite.jdbc4.JDBC4Statement
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 4 more
暂无答案!
目前还没有任何答案,快来回答吧!