为什么我的Flink任务管理器在将MySQL数据传输到Iceberg时崩溃?

yhuiod9q  于 2023-11-15  发布在  Apache
关注(0)|答案(1)|浏览(136)

源表大约有4 TB大。所以我们尝试使用Flink来流迁移它们。理想情况下,Flink会从MySQL中提取数据并逐行传输到Iceberg。每行插入Iceberg后,数据应该不再占用Flink JVM中的任何内存。然而,任务管理器崩溃,显示org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id localhost:26211-081070 is no longer reachable.

23/10/17 14:43:34 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
        at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0.0 in stage 0.0 (TID 0),5,main]
java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
        at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (ars.momenta.ai executor driver): java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
        at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 192, in deco
    converted = convert_exception(e.java_exception)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 170, in convert_exception
    is_instance_of(gw, c, "org.apache.spark.api.python.PythonException")
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 464, in is_instance_of
    return gateway.jvm.py4j.reflection.TypeUtil.isInstanceOf(
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1722, in __getattr__
    raise Py4JError(message)
py4j.protocol.Py4JError: py4j does not exist in the JVM
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/transfer_job/spark_write.py", line 60, in <module>
    sample = df.first().asDict()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1938, in first
    return self.head()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1924, in head
    rs = self.head(1)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1926, in head
    return self.take(n)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 868, in take
    return self.limit(num).collect()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 816, in collect
    with SCCallSiteSync(self._sc):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/traceback_utils.py", line 81, in __exit__
    self._context._jsc.setCallSite(None)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1320, in __call__
    answer = self.gateway_client.send_command(command)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1036, in send_command
    connection = self._get_connection()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 284, in _get_connection
    connection = self._create_new_connection()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 291, in _create_new_connection
    connection.connect_to_java_server()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 438, in connect_to_java_server
    self.socket.connect((self.java_address, self.java_port))
ConnectionRefusedError: [Errno 111] Connection refused

字符串
下面是我的代码:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1).enable_checkpointing(3000)
table_env = StreamTableEnvironment.create(env)

# source table
table_env.execute_sql("""
    CREATE TABLE src_result (
        id STRING,
        input_md5 STRING,
        output_md5 STRING,
        log STRING,
        metric STRING,
        create_time TIMESTAMP(6),      
        update_time TIMESTAMP(6),
        workflow_id_id STRING,
        error_details STRING,
        error_stage STRING,
        error_type STRING
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://xxx',
        'table-name' = 'result',
        'username' = 'ars_dev',
        'password' = '01234567'
    );
""")

# target iceberg
table_env.execute_sql("CREATE CATALOG iceberg WITH ("
                    "'type'='iceberg', "
                    "'catalog-type'='hive', "
                    "'uri'='thrift://xxx',"
                    "'warehouse'='xxx')")

# start migration
t_res = table_env.execute_sql("""
    INSERT INTO iceberg.ars.results
    SELECT 
        id,
        input_md5,
        output_md5,
        log,
        metric,
        CAST(create_time AS STRING),
        CAST(update_time AS STRING),
        workflow_id_id,
        error_details,
        error_stage,
        error_type  
    FROM src_result
""")


我是Flink的新手。我认为使用INSERT INTO xx SELECT * FROM xx可能会立即将所有表带到内存中。然而,当我刚刚使用SELECT * FROM xx并流处理它们时,在访问了~ 10 k个条目后发生了同样的错误。这怎么可能?我需要手动清除内存吗?

au9on6nz

au9on6nz1#

Flink JDBC Source连接器只能作为Bounded source使用,这意味着它可以进行批处理。它不会像Kafka或FileSystem那样流式传输结果。因此,当您运行查询时,您需要从源中选择所有需要存储的数据,直到它可以存储到您的sink中。这很可能导致Flink内存不足。

相关问题