sqlite—无法将数据流写入flink中map函数内的sqlite3

f0ofjuux  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(341)

我有一个flink程序,它使用自定义生成器生成数据,同时,我想将数据保存到sqlite3数据库中。一种方法是在map函数中将数据解析为所需的格式,然后从map操作中插入数据。但我做不到。我用来将数据写入sqlite3的代码如下所示,我并没有提及整个代码,所以数据库连接也在代码的初始部分,下面不包括

DataStream<Event> merged = stream1.union(stream2,stream3);

    merged.print();

    // sending data to Timekeeper via socket

    merged.map(new MapFunction<Event, String>() {

        @Override
        public String map(Event event) throws Exception {

            String tuple = event.toString();
             Integer patient_id = event.getPatinet_id();
             Integer sensor_id = event.getSensor_id();
             Integer uid = event.getUid();
             Long time = event.getTime();
             Integer value = event.getValue();

                 String sql1 =   "INSERT into mobile_events ( patientid , sensorid , uid , eatg ,valuez ) VALUES (" +
                         + patient_id + ","
                         + sensor_id + ","
                         + uid + ","
                         + time + ","
                         + value
                         + ");" ;

                 stmt.executeUpdate(sql1);

            return tuple + "\n";

        }
    });

问题是

Error:(124, 22) java: local variables referenced from an inner class must be final or effectively final

如果我让它成为最终的,那么我会得到以下错误

Caused by: java.io.NotSerializableException: org.sqlite.jdbc4.JDBC4Statement
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 4 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题