elasticsearch 从即将到来的脉冲星中插入大量数据

yws3nbqq  于 2022-11-02  发布在  ElasticSearch
关注(0)|答案(1)|浏览(139)

我不得不使用goelastic库插入大量来自未来脉冲星的数据。但我有一个问题。
首先,Pulsar发送1000个数据每部分散装。然后,当我插入弹性,有时会有一个问题。这个问题是附加的。这个问题会导致数据丢失。谢谢回答... ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk[s]] would be [524374312/500mb], which is larger than the limit of [510027366/486.3mb], real usage: [524323448/500mb], new bytes reserved: [50864/49.6kb], usages [request=0/0b, fielddata=160771183/153.3mb, in_flight_requests=50864/49.6kb, model_inference=0/0b, eql_sequence=0/0b, accounting=6898128/6.5mb]
此部分是批量代码。

func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {
    fmt.Println("------------------")
    bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Index:      enum.IndexName,
        Client:     ElasticStruct.Client,
        FlushBytes: 10e+6,
    })
    if err != nil {
        panic(err)
    }
    start := time.Now().UTC()

    for _, x := range y {
        data, err := json.Marshal(x)
        if err != nil {
            panic(err)
        }
        err = bi.Add(
            context.Background(),
            esutil.BulkIndexerItem{
                Action: "index",

                Body: bytes.NewReader(data),

                OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
                    i++
                },

                OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
                    if err != nil {
                        log.Printf("ERROR: %s", err)
                    } else {
                        log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
                    }
                },
            },
        )
        if err != nil {
            log.Fatalf("Unexpected error: %s", err)
        }
        x++

    }
    if err := bi.Close(context.Background()); err != nil {
        log.Fatalf("Unexpected error: %s", err)
    }
    dur := time.Since(start)
    fmt.Println(dur)
    fmt.Println("Success writing data to elastic : ", i)
    fmt.Println("Success incoming data from pulsar : ", x)
    fmt.Println("Difference : ", x-i)
    fmt.Println("Now : ", time.Now().UTC().String())
    if i < x {
        fmt.Println("FATAL")
    }
    fmt.Println("------------------")

}
8ulbf1ek

8ulbf1ek1#

Tldr;

您的节点上似乎没有足够的JVM堆。
您正在单击circuit breaker以避免ElasticSearch内存不足(OOM)。

解决方案

  • 增加JVM内存,你会发现here的一些文档来调整你的节点。
  • 较小批量请求

相关问题