目标:我的目标是使用Kafka将消息发送到使用Kafka Connect的Cassandra接收器。
我已经部署了Kafka和Cassandra,并且能够单独使用它们中的每一个--向Kafka发送数据、使用生产者传递消息和使用者使用消息都没有问题。使用cqlsh创建表并向其中插入数据也没有问题。但是,* 每当我尝试部署DataStax Apache Kafka Connector时,Cassandra似乎崩溃了。*
我正在尝试学习如何使用Kafka Connect,只使用一个Kafka生产者、代理和一个Cassandra密钥空间,使用独立模式。我已经按照DataStax上的说明配置了***connect-standalone. properties***和***cassandra-sink-standalone. properties***:https://docs.datastax.com/en/kafka/doc/kafka/kafkaStringJson.html
- 连接-独立.属性**
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path= *install_location*/kafka-connect-cassandra-sink-1.4.0.jar
- cassandra接收器独立.属性**
name=stocks-sink
connector.class=com.datastax.kafkaconnector.DseSinkConnector
tasks.max=1
topics=stocks_topic
topic.stocks_topic.stocks_keyspace.stocks_table.mapping = symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value
然后,使用bin/connect-standalone.sh connect-standalone.properties cassandra-sink-standalone.properties
启动Kafka连接器。
在我尝试启动Kafka Connector时,大约95%的情况下Cassandra会崩溃。运行bin/nodetool status
会显示以下消息:
节点工具:无法连接到"127.0.0.1:7199"-连接异常:'连接被拒绝(连接被拒绝)'
在 * system. log * 和 * debug. log * 日志中,没有任何迹象表明Cassandra已经崩溃。最后一行仍然是:
信息[主页] 2023 - 01 - 31 00:00:00,143存储服务. java:2806-节点本地主机/127.0.0.1:7000状态跳转到正常
在Kafka Connect日志中,错误消息指出:
[2023-01-31 15:24:47,803] INFO [plc-sink|task-0] DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.6.0 (com.datastax.oss.driver.internal.core.DefaultMavenCoordinates:37)
[2023-01-31 15:24:47,947] INFO [plc-sink|task-0] Could not register Graph extensions; this is normal if Tinkerpop was explicitly excluded from classpath (com.datastax.oss.driver.internal.core.context.InternalDriverContext:540)
[2023-01-31 15:24:47,948] INFO [plc-sink|task-0] Could not register Reactive extensions; this is normal if Reactive Streams was explicitly excluded from classpath (com.datastax.oss.driver.internal.core.context.InternalDriverContext:559)
[2023-01-31 15:24:47,997] INFO [plc-sink|task-0] Using native clock for microsecond precision (com.datastax.oss.driver.internal.core.time.Clock:40)
[2023-01-31 15:24:47,999] INFO [plc-sink|task-0] [s0] No contact points provided, defaulting to /127.0.0.1:9042 (com.datastax.oss.driver.internal.core.metadata.MetadataManager:134)
[2023-01-31 15:24:48,190] WARN [plc-sink|task-0] [s0] Error connecting to Node(endPoint=/127.0.0.1:9042, hostId=null, hashCode=3247c5e4), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (java.nio.channels.ClosedChannelException)) (com.datastax.oss.driver.internal.core.control.ControlConnection:34)
[2023-01-31 15:24:48,200] ERROR [plc-sink|task-0] WorkerSinkTask{id=plc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=/127.0.0.1:9042, hostId=null, hashCode=3247c5e4): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (java.nio.channels.ClosedChannelException)]
at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:141)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:612)
at com.datastax.oss.kafka.sink.state.LifeCycleManager.buildCqlSession(LifeCycleManager.java:518)
at com.datastax.oss.kafka.sink.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:113)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at com.datastax.oss.kafka.sink.state.LifeCycleManager.startTask(LifeCycleManager.java:109)
at com.datastax.oss.kafka.sink.CassandraSinkTask.start(CassandraSinkTask.java:83)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Suppressed: com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (java.nio.channels.ClosedChannelException)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.fail(ProtocolInitHandler.java:342)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.writeListener(ChannelHandlerRequest.java:87)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:183)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.send(ChannelHandlerRequest.java:76)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.send(ProtocolInitHandler.java:183)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler.onRealConnect(ProtocolInitHandler.java:118)
at com.datastax.oss.driver.internal.core.channel.ConnectInitHandler.lambda$connect$0(ConnectInitHandler.java:57)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Suppressed: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9042
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:921)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:748)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:740)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:726)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:748)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:763)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:806)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.send(ChannelHandlerRequest.java:75)
... 20 more
在5%的时间里,Cassandra实际上没有崩溃,下面的消息显示在Kafka连接的日志:
[2023-01-31 15:41:32,839] INFO [plc-sink|task-0] DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.6.0 (com.datastax.oss.driver.internal.core.DefaultMavenCoordinates:37)
[2023-01-31 15:41:32,981] INFO [plc-sink|task-0] Could not register Graph extensions; this is normal if Tinkerpop was explicitly excluded from classpath (com.datastax.oss.driver.internal.core.context.InternalDriverContext:540)
[2023-01-31 15:41:32,982] INFO [plc-sink|task-0] Could not register Reactive extensions; this is normal if Reactive Streams was explicitly excluded from classpath (com.datastax.oss.driver.internal.core.context.InternalDriverContext:559)
[2023-01-31 15:41:33,037] INFO [plc-sink|task-0] Using native clock for microsecond precision (com.datastax.oss.driver.internal.core.time.Clock:40)
[2023-01-31 15:41:33,040] INFO [plc-sink|task-0] [s0] No contact points provided, defaulting to /127.0.0.1:9042 (com.datastax.oss.driver.internal.core.metadata.MetadataManager:134)
[2023-01-31 15:41:33,254] INFO [plc-sink|task-0] [s0] Failed to connect with protocol DSE_V2, retrying with DSE_V1 (com.datastax.oss.driver.internal.core.channel.ChannelFactory:224)
[2023-01-31 15:41:33,263] INFO [plc-sink|task-0] [s0] Failed to connect with protocol DSE_V1, retrying with V4 (com.datastax.oss.driver.internal.core.channel.ChannelFactory:224)
[2023-01-31 15:41:34,091] INFO [plc-sink|task-0] WorkerSinkTask{id=plc-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:313)
[2023-01-31 15:41:34,092] INFO [plc-sink|task-0] WorkerSinkTask{id=plc-sink-0} Executing sink task (org.apache.kafka.connect.runtime.WorkerSinkTask:198)
...
版本:
- ApacheCassandra4.0.7
- ApacheKafka3.3.1
- DataStaxApacheKafka连接器1.4.0
我目前在Windows 11上使用WSL2 Ubuntu 20.04.5,规格如下:
- CPU:4核
- 内存:8GB内存
- 磁盘(固态硬盘):250 GB
看到它实际上工作了5%的时间,我怀疑这是https://community.datastax.com/questions/6947/index.html中概述的OOM问题(我有时碰巧有足够的内存?)。我已经尝试了本文中的解决方案,但它没有帮助。我如何配置Cassandra/Kafka Connect来避免这个问题?这只是需要一台有更多内存的计算机的问题吗?
1条答案
按热度按时间chhkpiq41#
我认为你说记忆力是个问题是对的。
我有一个“小”的Windows表面专业版,我用它来复制像你这样的问题。我也在这台笔记本电脑上运行Ubuntu 20.04和WSL2。
默认情况下,Windows会将一半的系统内存分配给WSL2,所以在我的8GB内存以下的安装中,Ubuntu安装占用了3.7GB内存。Cassandra的普通安装(开箱即用的零配置)从1.3GB内存开始,所以只有2.4GB留给其他所有东西。
我怀疑当你在同一个节点上启动Kafka时,Ubuntu会耗尽内存并触发Linux
oom-killer
。虽然最终结果是相似的,但触发器与what I described in the post you linked略有不同,所以我建议显式设置disk_access_mode
在这种情况下没有帮助。解决方法是通过在
conf/cassandra-env.sh
中设置MAX_HEAP_SIZE
,将Cassandra配置为仅分配1GB内存:Kafka默认配置为1GB,但如果不是,请在
bin/kafka-server-start.sh
中设置以下内容:通过设置这两个,应该有超过1GB留给Ubuntu,并希望允许您运行您的测试。