我正在尝试使用nifi的putcassandrarecord处理器将一些json记录插入到cassandra数据库中。我试图在cassandra中插入一个timestamp类型,但是nifi抱怨输入字符串“2019-02-02t08:00:00.000”出现numberformatexception
所述时间戳字段的cassandra数据类型是(ts timestamp),我正在使用avro模式:{“name”:“ts”,“type”:{“type”:“long”,“logicaltype”:“timestamp millis”}}
{
"name": "app.records",
"type": "record",
"fields": [
{ "name": "ts", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{ "name": "app_name", "type": "string" },
nifi日志显示它能够解析json对象,但无法将其转换为记录。。。
2019-05-13 21:13:04,036 ERROR [Timer-Driven Process Thread-2] o.a.n.p.cassandra.PutCassandraRecord PutCassandraRecord[id=ecb33d77-cc4a-17f5-23a8-e002e1777a1c] Unable to write the records into Cassandra table due to org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema: org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema
org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema
at org.apache.nifi.json.AbstractJsonRowRecordReader.nextRecord(AbstractJsonRowRecordReader.java:98)
at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
at org.apache.nifi.processors.cassandra.PutCassandraRecord.onTrigger(PutCassandraRecord.java:151)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "2019-02-02T08:00:35.473"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at org.apache.nifi.serialization.record.util.DataTypeUtils.toTimestamp(DataTypeUtils.java:1057)
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:156)
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:120)
at org.apache.nifi.json.JsonTreeRowRecordReader.convertField(JsonTreeRowRecordReader.java:170)
at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:137)
at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:83)
at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:74)
at org.apache.nifi.json.AbstractJsonRowRecordReader.nextRecord(AbstractJsonRowRecordReader.java:93)
... 14 common frames omitted
这些类型似乎都是正确的。任何帮助都将不胜感激。
2条答案
按热度按时间pengsaosao1#
问题是您试图在没有指定日期格式的情况下插入timestamp字段。相应的代码如下所示:
如果输入数据是字符串,则尝试获取其格式字符串,如果格式字符串是有效的格式化程序,则使用它获取日期。如果没有指定格式字符串或格式字符串无效,那么nifi将尝试使用
Long.parseLong
.您需要使用如下方式执行相应字段的显式强制转换:
ou6hu8tu2#
我最终将datetime转换为epoch时间戳,并将其转换为毫秒,并将其转换为long,以便它与我的avro模式一起工作。