无法从csv输入创建嵌套的json输出(聚合)

ut6juiuv  于 2021-06-15  发布在  ElasticSearch
关注(0)|答案(0)|浏览(253)

我面临的问题是我需要在id上聚合csv输入,并且它包含多个嵌套。我能够执行单嵌套,但在进一步嵌套时,我无法编写正确的语法。
输入:

input {
  generator {
    id => "first"
    type => 'csv'
    message => '829cd0e0-8d24-4f25-92e1-724e6bd811e0,GSIH1,2017-10-10 00:00:00.000,HCC,0.83,COMMUNITYID1'
    count => 1
  }
  generator {
    id => "second"
    type => 'csv'
    message => '829cd0e0-8d24-4f25-92e1-724e6bd811e0,GSIH1,2017-10-10 00:00:00.000,LACE,12,COMMUNITYID1'
    count => 1
  }
  generator {
    id => "third"
    type => 'csv'
    message => '829cd0e0-8d24-4f25-92e1-724e6bd811e0,GSIH1,2017-10-10 00:00:00.000,CCI,0.23,COMMUNITYID1'
    count => 1
  }
}

filter
{
  csv {
    columns => ['id', 'reference', 'occurrenceDateTime', 'code', 'probabilityDecimal', 'comment']
  }

  mutate {
    rename => {
      "reference" => "[subject][reference]"
      "code" => "[prediction][outcome][coding][code]"
      "probabilityDecimal" => "[prediction][probabilityDecimal]"
    }
  }
  mutate {
    add_field => {
      "[resourceType]" => "RiskAssessment"
      "[prediction][outcome][text]" => "Member HCC score based on CMS HCC V22 risk adjustment model"
      "[status]" => "final"
    }
  }
  mutate {
    update => {
      "[subject][reference]" => "Patient/%{[subject][reference]}"
      "[comment]" => "CommunityId/%{[comment]}"
    }
  }
  mutate {
    remove_field => [ "@timestamp", "sequence", "@version", "message", "host", "type" ]
  }
}

filter {
  aggregate {
    task_id => "%{id}"
    code => "
      map['resourceType'] = event.get('resourceType')
      map['id'] = event.get('id')
      map['status'] = event.get('status')
      map['occurrenceDateTime'] = event.get('occurrenceDateTime')
      map['comment'] = event.get('comment')
      map['[reference]'] = event.get('[subject][reference]')
      map['[prediction]'] ||= 
      map['[prediction]'] << {
        'code' => event.get('[prediction][outcome][coding][code]'),
        'text' => event.get('[prediction][outcome][text]'),
        'probabilityDecimal'=> event.get('[prediction][probabilityDecimal]')
      }
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }
  mutate {
    remove_field => [ "@timestamp", "tags", "@version"]
  }
}

output{
  elasticsearch {
    template => "templates/riskFactor.json"
    template_name => "riskFactor"
    action => "index"
    hosts => ["localhost:9201"]
    index => ["deepak"]
  }
  stdout {
    codec => json{}
  }
}

输出:

{
  "reference": "Patient/GSIH1",
  "comment": "CommunityId/COMMUNITYID1",
  "id": "829cd0e0-8d24-4f25-92e1-724e6bd811e0",
  "status": "final",
  "resourceType": "RiskAssessment",
  "occurrenceDateTime": "2017-10-10 00:00:00.000",
  "prediction": [
    {
      "probabilityDecimal": "0.83",
      "code": "HCC",
      "text": "Member HCC score based on CMS HCC V22 risk adjustment model"
    },
    {
      "probabilityDecimal": "0.23",
      "code": "CCI",
      "text": "Member HCC score based on CMS HCC V22 risk adjustment model"
    },
    {
      "probabilityDecimal": "12",
      "code": "LACE",
      "text": "Member HCC score based on CMS HCC V22 risk adjustment model"
    }
  ]
}

所需输出:

{
  "resourceType": "RiskAssessment",
  "id": "829cd0e0-8d24-4f25-92e1-724e6bd811e0",
  "status": "final",
  "subject": {
    "reference": "Patient/GSIH1"
  },
  "occurrenceDateTime": "2017-10-10 00:00:00.000",
  "prediction": [
    {
      "outcome": {
        "coding": [
          {
            "code": "HCC"
          }
        ],
        "text": "Member HCC score based on CMS HCC V22 risk adjustment model"
      },
      "probabilityDecimal": 0.83
    },
    {
      "outcome": {
        "coding": [
          {
            "code": "CCI"
          }
        ],
        "text": "Member HCC score based on CMS HCC V22 risk adjustment model"
      },
      "probabilityDecimal": 0.83
    }
  ],
  "comment": "CommunityId/COMMUNITYID1"
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题