我正在通过一个数据通道传输多个图像,使用从服务器到客户端的非阻塞连接。第一个客户机完全接收所有图像,但是第二个客户机丢失了第二个图像的消息/包,其余的都正常。
我有一个计数器来计算发送和接收的消息的数量,我还有下一个输出
服务器输出客户端输出
所以我收到了25405条信息,而不是28217条。
我还必须发送128字节的小程序包,如果不是的话,图像就会损坏。我猜某些消息丢失是因为程序正忙于处理前一条消息,但我不知道如何确保它到达。
代码如下:
服务器
public class Server extends Thread{
String initTag = "<in>";
String imgPacketTag = "<img>";
String titleTag = "<title>";
String endFileTag = "<end>";
int pto=9000;
String hhost="localhost";
InetSocketAddress clientSocket;
DatagramChannel s;
SelectionKey selKey;
Selector sel;
int bufferSize = 1024*1024;
ByteBuffer b = ByteBuffer.allocate(bufferSize);
File [] imagenes = new File[21];
final File folder = new File("imagenes");
public void run(){
listFilesForFolder(folder);
try{
// NetworkInterface ni = NetworkInterface.getByName("eth3");
InetSocketAddress dir = new InetSocketAddress(hhost, pto);
s = DatagramChannel.open(StandardProtocolFamily.INET);
s.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// s.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);
// InetAddress group = InetAddress.getByName("230.0.0.1");
// s.join(group, ni);
s.configureBlocking(false);
s.socket().bind(dir);
sel = Selector.open();
s.register(sel, SelectionKey.OP_READ);
System.out.println("Servidor listo.. Esperando datagramas...");
while(true){
sel.select();
Iterator<SelectionKey>it = sel.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey k = (SelectionKey)it.next();
it.remove();
if(k.isReadable()){
handleReading(k);
continue;
}
}//while
}//while
}catch(Exception e){
e.printStackTrace();
}//catch
}//main
public void processMsg(String msgReceived){
System.out.println(msgReceived);
if (msgReceived.contains(initTag)) {
sendImages();
}
}
public void sendImages(){
try{
for (int i = 0; i < imagenes.length; i++) {
File fileToSend = imagenes[i]; //Manejador
long tam = fileToSend.length();
sendMessage(titleTag+fileToSend.getName()+","+tam);
DataInputStream inputStream = new DataInputStream(new FileInputStream(fileToSend));
byte[] bArray = new byte[128];
int cont=0;
int bytesRead;
int sendPkg = 0;
while ( sendPkg < tam){
sendPkg+=128;
bytesRead = inputStream.read(bArray);
String newMsg;
if (sendPkg>=tam) {
newMsg = endFileTag + new String(bArray, 0, bytesRead, "ISO-8859-1");
}else{
newMsg = imgPacketTag + new String(bArray, 0, bytesRead, "ISO-8859-1");
}
sendMessage( newMsg );
cont++;
}
//sendMessage(endFileTag);
System.out.println("Archivo "+fileToSend.getName()+" enviado con "+cont);
inputStream.close();
}
s.register(sel, SelectionKey.OP_READ);
System.out.println("Archivos enviados");
}catch(Exception e){
e.printStackTrace();
}
}
public void sendMessage(String txt){
try {
byte[] msgBytes;
msgBytes = txt.getBytes("ISO-8859-1");
sendMessage(msgBytes);
} catch (UnsupportedEncodingException ex) {
Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void sendMessage(byte[] msgBytes){
try {
selKey = s.register(sel, SelectionKey.OP_WRITE);
DatagramChannel ch = (DatagramChannel)selKey.channel();
b.clear();
//byte[] msgBytes = txt.getBytes();
b.putInt(msgBytes.length);
b.put(msgBytes);
b.flip();
ch.send(b, clientSocket);
s.register(sel, SelectionKey.OP_READ);
} catch (ClosedChannelException ex) {
Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void handleReading(SelectionKey k) throws IOException{
DatagramChannel ch = (DatagramChannel)k.channel();
b.clear();
SocketAddress emisor = ch.receive(b);
b.flip();
clientSocket = (InetSocketAddress)emisor;
System.out.println("Datagrama recibido desde "+ clientSocket.getAddress()+":"+clientSocket.getPort());
int len = b.getInt();
byte[] bytes = new byte[len];
b.get(bytes);
String msgReceived = new String(bytes);
processMsg(msgReceived);
}
public void listFilesForFolder(final File folder) {
File[] filesInFolder = folder.listFiles(); // This returns all the folders and files in your path
int i = 0;
for (File file : filesInFolder) { //For each of the entries do:
if (!file.isDirectory()) { //check that it's not a dir
imagenes[i] = file; //push the filename as a string
System.out.println(file.getName());
}
i++;
}
}
}
客户
public class Client extends Thread{
String nombre;
int tipo;
String msg;
int pto=9000;
String hhost="localhost";
SocketAddress remote=null;
DatagramChannel cl;
SelectionKey selKey;
Selector sel;
int bufferSize = 1024*1024;
ByteBuffer b = ByteBuffer.allocate(bufferSize);
String initTag = "<in>";
String imgPacketTag = "<img>";
String titleTag = "<title>";
String endFileTag = "<end>";
String currentImgTitle ="a.png";
FileOutputStream imgStream;
int contador = 0;
public Client(String nombre, int tipo){
this.nombre = nombre;
this.tipo = tipo;
msg = initTag+nombre;
File f = new File("imagenes");
if (f.mkdir()) {
System.out.println("directorio imagenes creado");
}
}
public void run(){
try{
remote = new InetSocketAddress(hhost, pto);
cl = DatagramChannel.open(StandardProtocolFamily.INET);
//cl.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// cl.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);
cl.configureBlocking(false);
sel = Selector.open();
selKey = cl.register(sel, SelectionKey.OP_WRITE);
// NetworkInterface ni = NetworkInterface.getByName("eth3");
// InetAddress group = InetAddress.getByName("230.0.0.1");
// cl.join(group, ni);
while(true){
sel.select();
Iterator<SelectionKey>it = sel.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey k = (SelectionKey)it.next();
it.remove();
if(k.isWritable()){
sendMessage();
continue;
}
if (k.isReadable()) {
handleReading(k);
continue;
}
}//while
}//while
}catch(Exception e){
e.printStackTrace();
}//catch
}
public void processMsg(String txt){
if (txt.contains(titleTag)) {
contador = 0;
String [] separadas = (txt.replace(titleTag, "")).split(",");
currentImgTitle = separadas[0];
System.out.println("title: "+currentImgTitle);
//System.out.println("tam: "+separadas[1]);
try {
imgStream = new FileOutputStream( "imagenes/"+currentImgTitle ) ;
} catch (FileNotFoundException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
return;
}
if (txt.contains(endFileTag)) {
try {
contador++;
txt = txt.replace(endFileTag, "");
byte[] buffer = txt.getBytes("ISO-8859-1");
imgStream.write( buffer);
imgStream.flush();
System.out.println(" end of file "+contador);
imgStream.close();
} catch (IOException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
return;
}
if (txt.contains(imgPacketTag)) {
contador++;
txt = txt.replace(imgPacketTag, "");
try {
byte[] buffer = txt.getBytes("ISO-8859-1");
imgStream.write( buffer);
imgStream.flush();
} catch (IOException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
return;
}
}
public void sendMessage(){
DatagramChannel ch = (DatagramChannel)selKey.channel();
b.clear();
byte[] msgBytes = msg.getBytes();
b.putInt(msgBytes.length);
b.put(msgBytes);
b.flip();
try {
ch.send(b, remote);
cl.register(sel, SelectionKey.OP_READ);
} catch (ClosedChannelException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void handleReading(SelectionKey k) throws IOException{
DatagramChannel ch = (DatagramChannel)k.channel();
b.clear();
SocketAddress emisor = ch.receive(b);
b.flip();
InetSocketAddress d = (InetSocketAddress)emisor;
//System.out.println("Datagrama recibido desde "+ d.getAddress()+":"+d.getPort());
int len = b.getInt();
byte[] bytes = new byte[len];
b.get(bytes);
String msgReceived = new String(bytes, "ISO-8859-1");
processMsg(msgReceived);
}
}
暂无答案!
目前还没有任何答案,快来回答吧!