Kafka大气不发送广播信息

zengzsys  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(285)

我对大气有这种依赖

<dependency>
    <groupId>org.atmosphere</groupId>
    <artifactId>atmosphere-runtime</artifactId>
    <version>2.5.4</version>
</dependency>

这是给Kafka的

<dependency>
  <groupId>org.atmosphere</groupId>
  <artifactId>atmosphere-kafka</artifactId>
  <version>2.5.2</version>
</dependency>

问题如下。在defaultbroadcaster.class的658行有atmosphere(没有kafka lib)

if (resources.isEmpty()) {
        logger.trace("No resource available for {} and message {}", getID(), finalMsg);
        entryDone(deliver.future);
        if (cacheForSet != null) {
            cacheForSet.clear();
        }
        return;
    }

在此模式下到达对象“资源”已满

AtmosphereResource{
           uuid=5348f619-df31-4bf6-a4eb-654ba64ef886,
           transport=WEBSOCKET,
           isInScope=true,
           isResumed=false,
           isCancelled=false,
           isSuspended=true,
           broadcasters=/clinicalevent/manager,
           isClosedByClient=false,
           isClosedByApplication=false,
           action=Action{timeout=-1, type=SUSPEND}}

在kafka库中,“resource”对象是空的。到达空不广播到浏览器
这是我的大气资源课

package eu.dedalus.sop.o4c.servlets.atmosphere.clinicalevent;
import java.io.IOException;
import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.ManagedService;
import org.atmosphere.config.service.Message;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import com.owlike.genson.Genson;
import eu.dedalus.sop.log4j.LogManagerSOP;
import eu.dedalus.sop.log4j.LoggerSOP;
import eu.dedalus.sop.o4c.models.common.atmosphere.DataEntryOperation;

@ManagedService(path = "/clinicalevent/manager")
public final class ClinicalEventManager {

private static final LoggerSOP logger = LogManagerSOP.getLogger(ClinicalEventManager.class);
private static final Genson genson = new Genson();

@Ready
public final void onReady(final AtmosphereResource r) {
    logger.info("Browser " + r.uuid() + " connected.");
}

@Disconnect
public final void onDisconnect(final AtmosphereResourceEvent event) {
    if (event.isCancelled())
        logger.info("Browser " + event.getResource().uuid() + " unexpectedly disconnected");
    else if (event.isClosedByClient())
        logger.info("Browser " + event.getResource().uuid() + " closed the connection");
}

@Message(encoders = {
        ClinicalEventManagerEncoderDecoder.class
}, decoders = {
        ClinicalEventManagerEncoderDecoder.class
})
public final DataEntryOperation onMessage(final DataEntryOperation message) throws IOException {
    logger.info(message.getUser() + " just sent " + genson.serialize(message));
    return message;
}
}

我是Kafka

bootstrap.servers=localhost:9092
group.id=clinicaleventmanager
zk.connect=localhost:2181
zookeeper.connect=localhost:2181

这是web.xml中的atmosphereservlet

<servlet>
    <servlet-name>AtmosphereServlet</servlet-name>
    <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
    <async-supported>true</async-supported>
    <init-param>
        <param-name>org.atmosphere.websocket.maxTextMessageSize</param-name>
        <param-value>1048576</param-value>
    </init-param>
    <init-param>
        <param-name>org.atmosphere.kafka.propertiesFile</param-name>
        <param-value>WEB-INF/classes/kafka.properties</param-value>
    </init-param>  
    <load-on-startup>2</load-on-startup>
</servlet>

这是编码器/解码器类公共最终类ClinicalEventManagerCoderCoder实现编码器、解码器{

private static final Genson genson = new Genson();

@Override
public DataEntryOperation decode(final String message) {
    try {
        DataEntryOperation dataEntryOperation = genson.deserialize(message, DataEntryOperation.class);
        dataEntryOperation.setDateTime(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
        return dataEntryOperation;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public String encode(final DataEntryOperation message) {
    try {
        return genson.serialize(message);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
}

有什么问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题