源表大约有4 TB大。所以我们尝试使用Flink来流迁移它们。理想情况下,Flink会从MySQL中提取数据并逐行传输到Iceberg。每行插入Iceberg后,数据应该不再占用Flink JVM中的任何内存。然而,任务管理器崩溃,显示org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id localhost:26211-081070 is no longer reachable.
,
23/10/17 14:43:34 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0.0 in stage 0.0 (TID 0),5,main]
java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (ars.momenta.ai executor driver): java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 192, in deco
converted = convert_exception(e.java_exception)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 170, in convert_exception
is_instance_of(gw, c, "org.apache.spark.api.python.PythonException")
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 464, in is_instance_of
return gateway.jvm.py4j.reflection.TypeUtil.isInstanceOf(
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1722, in __getattr__
raise Py4JError(message)
py4j.protocol.Py4JError: py4j does not exist in the JVM
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/transfer_job/spark_write.py", line 60, in <module>
sample = df.first().asDict()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1938, in first
return self.head()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1924, in head
rs = self.head(1)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1926, in head
return self.take(n)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 868, in take
return self.limit(num).collect()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 816, in collect
with SCCallSiteSync(self._sc):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/traceback_utils.py", line 81, in __exit__
self._context._jsc.setCallSite(None)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1320, in __call__
answer = self.gateway_client.send_command(command)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1036, in send_command
connection = self._get_connection()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 284, in _get_connection
connection = self._create_new_connection()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 291, in _create_new_connection
connection.connect_to_java_server()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 438, in connect_to_java_server
self.socket.connect((self.java_address, self.java_port))
ConnectionRefusedError: [Errno 111] Connection refused
字符串
下面是我的代码:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1).enable_checkpointing(3000)
table_env = StreamTableEnvironment.create(env)
# source table
table_env.execute_sql("""
CREATE TABLE src_result (
id STRING,
input_md5 STRING,
output_md5 STRING,
log STRING,
metric STRING,
create_time TIMESTAMP(6),
update_time TIMESTAMP(6),
workflow_id_id STRING,
error_details STRING,
error_stage STRING,
error_type STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx',
'table-name' = 'result',
'username' = 'ars_dev',
'password' = '01234567'
);
""")
# target iceberg
table_env.execute_sql("CREATE CATALOG iceberg WITH ("
"'type'='iceberg', "
"'catalog-type'='hive', "
"'uri'='thrift://xxx',"
"'warehouse'='xxx')")
# start migration
t_res = table_env.execute_sql("""
INSERT INTO iceberg.ars.results
SELECT
id,
input_md5,
output_md5,
log,
metric,
CAST(create_time AS STRING),
CAST(update_time AS STRING),
workflow_id_id,
error_details,
error_stage,
error_type
FROM src_result
""")
型
我是Flink的新手。我认为使用INSERT INTO xx SELECT * FROM xx
可能会立即将所有表带到内存中。然而,当我刚刚使用SELECT * FROM xx
并流处理它们时,在访问了~ 10 k个条目后发生了同样的错误。这怎么可能?我需要手动清除内存吗?
1条答案
按热度按时间au9on6nz1#
Flink JDBC Source连接器只能作为Bounded source使用,这意味着它可以进行批处理。它不会像Kafka或FileSystem那样流式传输结果。因此,当您运行查询时,您需要从源中选择所有需要存储的数据,直到它可以存储到您的sink中。这很可能导致Flink内存不足。