java—哪种方法是读取更改查找和丰富流式输入集合的最佳方法?

kpbwa7wx  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(254)

我使用的是apachebeam,流式收集容量为1.5gb。我的查找表是一个jdbciomysql响应。
当我在没有侧输入的情况下运行管道时,我的工作将在大约2分钟内完成。当我用侧边输入运行我的作业时,我的作业将永远不会完成,会死掉。
下面是我用来存储查找的代码(~1m条记录)

PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
       "com.mysql.jdbc.Driver", "jdbc:mysql://ip")
      .withUsername("username")
      .withPassword("password"))
      .withQuery("select a_number from cell")
      .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
      .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
      public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
        return KV.of(resultSet.getString(1), resultSet.getString(1));
      }
})).apply(View.asMap());

这是我的流媒体收藏代码

pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60),  Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))

下面是我的pardo的代码,用于迭代每个事件行(共10m条记录)

.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<String,Integer> i = c.element();
    String sideInputData = c.sideInput(sideData).get(i.getKey());
    if (sideInputData == null) {
      c.output(i);
    } 
  }
 }).withSideInputs(sideData));

我使用的是flink集群,但使用的是directrunner输出。
集群:
2 cpu 6核24gb ram
我做错什么了?我关注过这个

jchrr9hc

jchrr9hc1#

解决方案是创建一个“缓存”Map。
sideinput只触发一次,然后我将它缓存到一个与map等效的sucture中。
所以,我要避免为每个processelement做sideinput。

.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
   if (isFirstTime) {
        myList = c.sideInput(sideData);
    }
    isFirstTime = false;
    boolean result = myList.containsKey(c.element().getKey());         
    if (result == false) {
      c.output(i);
    } 
  }
 }).withSideInputs(sideData));
5gfr0r5j

5gfr0r5j2#

如果用更少的数据运行,我怀疑程序正在耗尽java进程的所有内存。您可以通过jvisualvm或jconsole监视它。有许多文章涉及这个问题,我只是偶然发现了这一个快速谷歌搜索。
如果内存耗尽,您的java进程通常忙于清理内存,您会看到性能的巨大下降。在某个时刻,java放弃并失败了。
为了解决这个问题,增加java堆的大小就足够了。你如何增加它取决于你如何和在哪里执行它。看看java的 -Xmx 参数或梁中的某个堆选项。

相关问题