seata 2.0.0集成skywalking9.0.0时skywalking-api.log输出异常

dtcbnfnu  于 4个月前  发布在  其他
关注(0)|答案(1)|浏览(56)
  • I have searched the issues of this repository and believe that this is not a duplicate.

Ⅰ. Issue Description

2.0.0 /ext/apm-seata-skywalking-plugin在实现interceptor时,入参类型转换异常

Ⅱ. Describe what happened

If there is an exception, please attach the exception trace:

ERROR 2024-06-13 14:02:26.871 NettyServerNIOWorker_1_2_2 InstMethodsInter : class[class io.seata.core.rpc.processor.server.ServerHeartbeatProcessor] after method[process] intercept failure
java.lang.ClassCastException: io.netty.channel.DefaultChannelHandlerContext cannot be cast to io.seata.core.protocol.RpcMessage
at io.seata.apm.skywalking.plugin.RemotingProcessorProcessInterceptor.afterMethod(RemotingProcessorProcessInterceptor.java:70)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:97)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.process(ServerHeartbeatProcessor.java)
at io.seata.core.rpc.netty.AbstractNettyRemoting.processMessage(AbstractNettyRemoting.java:306)
at io.seata.core.rpc.netty.AbstractNettyRemotingServer$ServerHandler.channelRead(AbstractNettyRemotingServer.java:169)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)

ERROR 2024-06-13 14:02:26.871 NettyServerNIOWorker_1_2_2 InstMethodsInter : class[class io.seata.core.rpc.netty.NettyRemotingServer] after method[sendAsync] intercept failure
java.lang.ClassCastException: io.netty.channel.socket.nio.NioSocketChannel cannot be cast to io.seata.core.protocol.RpcMessage
at io.seata.apm.skywalking.plugin.NettyRemotingClientSendSyncInterceptor.afterMethod(NettyRemotingClientSendSyncInterceptor.java:71)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:97)
at io.seata.core.rpc.netty.AbstractNettyRemoting.sendAsync(AbstractNettyRemoting.java)
at io.seata.core.rpc.netty.AbstractNettyRemotingServer.sendAsyncResponse(AbstractNettyRemotingServer.java:104)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.$sw$original$process$hr32d22(ServerHeartbeatProcessor.java:48)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.$sw$original$process$hr32d22$accessor$$sw$aeun762(ServerHeartbeatProcessor.java)
at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor$$sw$auxiliary$in3hfi1.call(Unknown Source)
还有【beforeMethod】方法类型转换异常

Ⅲ. Describe what you expected to happen

主要的异常产生位置为:
DefaultCoreDoGlobalCommitInterceptor、NettyRemotingClientSendSyncInterceptor、RemotingProcessorProcessInterceptor和SWSeataUtils

DefaultCoreDoGlobalCommitInterceptor类中:
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
RpcMessage rpcMessage = null;
for(Object arg:allArguments) {
if (arg instanceof RpcMessage) {
rpcMessage = (RpcMessage) arg;
if (!(rpcMessage.getBody() instanceof AbstractMessage)) {
return;
}
}
}

/* 异常产生位置
    RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (!(rpcMessage.getBody() instanceof AbstractMessage)) {
        return;
    }*/

。。。
@OverRide
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
for(Object arg:allArguments){
if(arg instanceof RpcMessage){
RpcMessage rpcMessage = (RpcMessage) arg;
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}
}
}

/* 异常产生位置
    RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (rpcMessage.getBody() instanceof AbstractMessage) {
        ContextManager.stopSpan();
    }*/
    return ret;
}

NettyRemotingClientSendSyncInterceptor类中:
@OverRide
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
for(Object arg:allArguments){
if(arg instanceof RpcMessage){
RpcMessage rpcMessage = (RpcMessage) arg;
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}
}
}

/*RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (rpcMessage.getBody() instanceof AbstractMessage) {
        ContextManager.stopSpan();
    }*/

    return ret;
}

RemotingProcessorProcessInterceptor类中:
@OverRide
public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
for(Object arg:allArguments){
if(arg instanceof RpcMessage){
RpcMessage rpcMessage = (RpcMessage) arg;
if (rpcMessage.getBody() instanceof AbstractMessage) {
ContextManager.stopSpan();
}
}
}

/*RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (rpcMessage.getBody() instanceof AbstractMessage) {
        ContextManager.stopSpan();
    }*/

    return ret;
}

SWSeataUtils类中:
public static String convertXid(RpcMessage rpcMessage) {
String xid = null;
// 调整后
if(rpcMessage != null && rpcMessage.getBody() != null){
if(rpcMessage.getBody() instanceof AbstractMessage){
AbstractMessage subMessage = (AbstractMessage) rpcMessage.getBody();
String requestSimpleName = rpcMessage.getBody().getClass().getSimpleName();

try {
                xid = SWSeataConstants.TRANSACTION_TRANSMISSION_CLASS_NAME_MAPPING.get(requestSimpleName) != null
                        ? (String) SWSeataConstants.TRANSACTION_TRANSMISSION_CLASS_NAME_MAPPING.get(requestSimpleName)
                        .getDeclaredMethod("getXid").invoke(subMessage)
                        : xid;
            } catch (Throwable e) {
                LOGGER.error("convert seata xid failure", e);
            }
        }
    }

    return xid;
}

以上是测试通过后的代码调整,仅供参考

Ⅵ. Environment:

  • JDK version(e.g. java -version ): jdk 8
  • Seata client/server version: 2.0.0
  • Database version:
  • OS(e.g. uname -a ):
  • Others: skywalking-java-agent 9.0.0-alpine
1szpjjfi

1szpjjfi1#

you can Consider providing a PR for the SkyWalking seata plugin

相关问题