我有有趣的问题与akka TCP流:
查看代码:
package snp.server;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.OutgoingConnection;
import akka.util.ByteString;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Framing;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class TCPClient_SinkSource1 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
final ActorSystem system = ActorSystem.create("StreamTcpDocTest");
final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection
= Tcp.get(system).outgoingConnection("192.168.62.130", 59090).map( f -> {
System.out.println("out1:" + f.utf8String());
return f;
});
final Sink<ByteString, CompletionStage<Done>> sink = Sink.foreach(f -> {
System.out.println("from server:" + f.utf8String());
});
Source<ByteString, NotUsed> source = Source.range(1, 5).map(f -> ByteString.fromString(f.toString()))
.throttle(1, Duration.ofMillis(30));
Flow<ByteString, ByteString, NotUsed> clientFlow = Flow.fromSinkAndSource(sink, source).map( f -> {
System.out.println("out:" + f.utf8String());
return f;
});
CompletionStage<OutgoingConnection> connectionCS = connection
.join(clientFlow).run(system);
connectionCS.whenComplete((d, e) -> {
System.out.println("client conn: localAddress:" + d.localAddress()
+ " remoteAddress:" + d.remoteAddress());
System.out.println("e:" + e);
});
}
}
结果以某种方式连接,服务器回复例如为123 45。
当我增加到:* * 油门(1,持续时间毫秒(3000));**
服务器的回复是一个接一个的,正如我所期望的那样。有人能描述一下如何避免连接回复吗?
1条答案
按热度按时间bpzcxfmw1#
您不应将单个
ByteString
视为网络协议中的帧,而应使用一些实际的帧,并在使用端收集整个帧,单个ByteString
可能最终会成为多个ByteString
,或者在接收端反过来,这取决于发送系统和中间的网络。正确的成帧可以由几个运营商开箱即用Akka Streams完成:https://doc.akka.io/docs/akka/current/stream/stream-io.html#using-framing-in-your-protocol
对于这个特定的例子,我想你可以看到Akka流TCP实现中的优化结果,它试图避免多个系统调用(它们是昂贵的),方法是在实际将其交给操作系统之前,在一个小的时间范围内批处理尽可能多的字节。