我正在尝试学习使用java的sockets,我成功地将数据发送到在我自己的机器上运行的serversocket,现在我需要在我们的项目中实现paymentgateway应用程序
在server.java上,它挂在readline()中。如何解决这个问题?感谢您在这方面的帮助。工作正常,直到低于line.system.out.println(“在这里”);然后就被封锁了。
public class Server implements Runnable {
Server(Socket sock) throws IOException {
this.socket = sock;
}
static{
serverconfig = initServerConfig();
threadPool = Executors.newScheduledThreadPool(Integer.parseInt((String)serverconfig.get("SERVER_PROCESSOR_THREAD")));
}
private static HashMap initServerConfig(){
HashMap table = new HashMap();
ResourceBundle myResources = null;
Properties sysProps = System.getProperties();
sysProps.put("oracle.jdbc.V8Compatible", "true");
System.setProperties(sysProps);
Locale locale = Locale.getDefault();
try{
myResources = ResourceBundle.getBundle(FILENAME,locale);
}catch(MissingResourceException e){
log.error("server config not found", e);
}
if(myResources!=null){
log.info("Loading server configuraton ...");
table.put("SERVER_IP",myResources.getString("server.ip"));
table.put("SERVER_PORT",myResources.getString("server.port"));
table.put("SERVER_PROCESSOR_THREAD",myResources.getString("processor.threads"));
/** table.put("MSG_HEADER_LENGTH",myResources.getString("msgheader.length"));*/
table.put("ISO_HEADER_LENGTH",myResources.getString("isoheader.length"));
table.put("SOCKET_BUFFER_LENGTH",myResources.getString("socket.buffer.length"));
table.put("SOCKET_TIME_OUT_SEC",myResources.getString("socket.timeout.sec"));
String arrAllowed[] = myResources.getString("allowed.client.ip").split(",");
table.put("ALLOWED_CLIENT_IP",new ArrayList(Arrays.asList(arrAllowed)));
}
else
{
log.error("ISO8583 server module is not deployed properly");
System.exit(0);
}
return table;
}
public static void main(String[] args) throws Exception {
mfact=ConfigParser.createFromUrl(new URL("file:///D:/SmartFren/ISO8583/7.0.0/modules/ISO8583_Payment_Gateway/src/defaultconfig/server-config.xml"));
mfact.setAssignDate(true);
mfact.setTraceNumberGenerator(new SimpleTraceGenerator((int) (System.currentTimeMillis() % 10000)));
log.info("Setting up server socket...");
int port = Integer.parseInt((String)serverconfig.get("SERVER_PORT"));
if(available(port)) {
ServerSocket serverSocket = new ServerSocket(port);
log.info("Waiting for connections...");
while (true) {
Socket socket = serverSocket.accept();
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
//out.println("output");
String remoteAddrss = socket.getInetAddress().toString();
int remotePort = socket.getPort();
if(((List)serverconfig.get("ALLOWED_CLIENT_IP")).contains(remoteAddrss.replaceAll("/",""))) {
if(clientSockets.containsKey(remoteAddrss)) {
log.info(String.format("New connection from %s:%d so closing old connection socket", remoteAddrss, remotePort));
try {
((Socket)clientSockets.get(remoteAddrss)).close();
}catch (SocketException ex) {
log.error("SocketException occurred in closing old socket...", ex);
}catch (IOException ex) {
log.error("IOException occurred in closing old socket...", ex);
}
clientSockets.remove(remoteAddrss);
}else {
log.info(String.format("New connection from %s:%d", remoteAddrss, remotePort));
}
clientSockets.put(remoteAddrss, socket);
socket.setSoTimeout(Integer.parseInt((String)serverconfig.get("SOCKET_TIME_OUT_SEC"))*1000);//takes input in miliseconds.
socket.setReceiveBufferSize(Integer.parseInt((String)serverconfig.get("SOCKET_BUFFER_LENGTH")));
new Thread(new Server(socket), "Sitra-j8583-client-handler").start();
}else {
log.debug(String.format("Connected remote client %s is not allowed to accesss the services.",remoteAddrss));
try {
socket.close();
}catch(Exception ex) {
log.error("Exception Caught while closing the socket.",ex);
}
}
}
}else {
log.debug("Server is already running. Could not start another server instance.");
}
}
@Override
public void run() {
int count = 0;
System.out.println("in run");
synchronized(this){
this.runningThread = Thread.currentThread();
}
try {
ServerShutdownHook shutdownHook = new ServerShutdownHook(socket);
Runtime.getRuntime().addShutdownHook(shutdownHook);
while (socket.isConnected() && !socket.isClosed() && !socket.isInputShutdown()
&& this.runningThread.isAlive() && !this.runningThread.isInterrupted()) {
System.out.println("connection ");
System.out.println("here it is");
BufferedReader in = new BufferedReader (new InputStreamReader
(socket.getInputStream ()));
String fromClient = in.readLine();
System.out.println("fromClient"+fromClient);
log.info("value reciedvec" + fromClient);
byte[] buf = new byte[fromClient.getBytes().length];
// We're not expecting ETX in this case
buf =fromClient.getBytes();
count++;
threadPool.schedule(new Processor(buf, socket), 400,TimeUnit.MILLISECONDS);
/**}else{
System.out.println("Not in length buffer size");
}*/
}
}catch (SocketTimeoutException ex) {
log.error("SocketTimeoutException occurred...", ex);
}catch (SocketException ex) {
log.error("SocketException occurred...", ex);
}catch (IOException ex) {
log.error("IOException occurred...", ex);
}
log.debug(String.format("Exiting after reading %d requests", count));
try {
socket.close();//additional try to close socket if input stream is only closed.
} catch (IOException ex) {
log.error("Exception Caught while closing the socket.",ex);
}
}
private IsoMessage generateNetworkMessage(MessageFactory mfact,int iType){
IsoMessage req = mfact.newMessage(0x800);
req.setValue(ISO8583Fields.NETWORK_MANAGEMENT_INFORMATION_CODE, iType, IsoType.NUMERIC, 3);
return req;
}
private static boolean available(int port) {
ServerSocket ss = null;
DatagramSocket ds = null;
try {
ss = new ServerSocket(port);
ss.setReuseAddress(true);
ds = new DatagramSocket(port);
ds.setReuseAddress(true);
return true;
} catch (IOException e) {
log.error("IOException Caught.",e);
} finally {
if (ds != null) {
ds.close();
}
if (ss != null) {
try {
ss.close();
} catch (IOException e) {
log.error("IOException Caught.",e);
/* should not be thrown */
}
}
}
return false;
}
public synchronized void stop(){
this.isStopped = true;
try {
this.serverSocket.close();
} catch (IOException e) {
log.error("Error closing server", e);
}
}
private class ServerShutdownHook extends Thread {
private Socket sock = null;
ServerShutdownHook(Socket inSoc){
this.sock = inSoc;
}
@Override
public void run() { System.out.println(" ServerShutdownHook connection ");
try{
if(sock != null && sock.isConnected() ){
IsoMessage req = generateNetworkMessage(mfact, 002);
log.info("Sending logoff Trace " + req.getField(ISO8583Fields.SYSTEMS_TRACE_AUDIT_NUMBER)+" at "+System.currentTimeMillis()+" "+new String(req.writeData()));
req.write(sock.getOutputStream(), 2);
}
Thread.sleep(100);
}catch (IOException ex) {
log.error("Couldn't close socket",ex);
}catch(InterruptedException ie){
log.error("Exception occurred...", ie);
Thread.currentThread().interrupt();
}finally{
sock = null;
}
}
}
private class Processor implements Runnable {
private byte[] msg;
private Socket sock;
private IBaseHandler handler = null;
private boolean isException = false;
Processor(byte[] buf, Socket s) {
msg = buf;
sock = s;
}
@Override
public void run() {
try {
Transaction8583DAO transactionDAO = new Transaction8583DAO();
TRANSACTION8583 transaction8583 = new TRANSACTION8583();
transaction8583.setReqString(new String(msg).trim());
transaction8583.setEventStartTime(new Date());
transaction8583.setEventStatus("Generated");
transactionDAO.save(transaction8583);
/**IsoMessage incoming = mfact.parseMessage(msg, Integer.parseInt((String)serverconfig.get("ISO_HEADER_LENGTH"))); */
IsoMessage incoming = mfact.parseMessage(msg, 0);
log.debug("info :::::"+incoming.getField(ISO8583Fields.TRANSMISSION_DATE_TIME));
log.info("Starting handler for Trace " + incoming.getField(ISO8583Fields.SYSTEMS_TRACE_AUDIT_NUMBER)+" at "+System.currentTimeMillis()+" "+new String(msg));
log.info("Incoming" +incoming.toString());
handler = new SampoernaHandler(mfact);
IsoMessage response = handler.handleRequest(incoming,transaction8583);
log.debug("AAAAAAAAAAAAAA"+sock.getOutputStream().toString());
log.debug("BBBBBBBBBBBBB"+sock.getOutputStream());
/* byte[] s = response.writeData();
String str = null;
try {
str = new String(s, "UTF-8");
} catch (UnsupportedEncodingException e) {
log.error("error while getting response string", e);
}
OutputStream outstream = socket.getOutputStream();
PrintWriter out = new PrintWriter(outstream);
out.print(str+" \n");
*//*
OutputStream outstream = socket.getOutputStream();
PrintWriter out = new PrintWriter(outstream);
out.print("abcccc"+" \n");*/
byte[] s = response.writeData();
String str = null;
try {
str = new String(s, "UTF-8");
} catch (UnsupportedEncodingException e) {
log.error("error while getting response string", e);
}
OutputStream os = socket.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(os);
BufferedWriter bw = new BufferedWriter(osw);
bw.write(str +"\r\n");
System.out.println("Message sent to the client is "+str);
bw.flush();
// response.write(sock.getOutputStream(), 0);//here message header length not need to write explicitly.
} catch (IOException ex) {
log.error("IOException Sending response", ex);
isException = true;
} catch(Exception e){
isException = true;
log.error("Exception occur.",e);
} finally {
try {
if(this.isException) {
this.sock.close();
log.debug("Closing Socket.. because of some error");
}
} catch (IOException ex) {
log.error("IOException Sending response", ex);
}
}
}
public void print(IsoMessage m) {
log.debug("TYPE: " + Integer.toHexString(m.getType()));
for (int i = 2; i < 128; i++) {
if (m.hasField(i)) {
log.debug("printing Field");
log.debug("F " + i + "(" + m.getField(i).getType() + "): " + m.getObjectValue(i) + " -> '"+ m.getField(i).toString() + "'");
}
}
}
}
}
在sampleclient.java中,当我们向服务器发送请求时,它会被阻塞。
public class SampleClient{
private static final Log log = LogFactory.getLog(SampleClient.class);
private static MessageFactory mfact;
private static Hashtable pending = new Hashtable();
public SampleClient() {
}
public static void main(String[] args) throws Exception {
Random rng = new Random(System.currentTimeMillis());
log.debug("Reading config");
mfact = ConfigParser.createFromClasspathConfig("server-config.xml");
mfact.setAssignDate(true);
mfact.setTraceNumberGenerator(new SimpleTraceGenerator((int)(System.currentTimeMillis() % 10000)));
log.debug("Connecting to server");
System.setProperty("javax.net.debug","all");
//Socket sock = new Socket("127.0.0.1", 9999);
Socket sock = new Socket("192.168.1.5", 9999);
if(sock.isConnected()) {
//Send 10 messages, then wait for the responses
//SampleClient reader = new SampleClient(sock);
new Thread(new SampleClient().new Receiver(sock),"j8583-SampleClient-Receiver").start();
String accountNumber = "30033997";
//Logn Message
IsoMessage req = generateNetworkMessage(mfact,NetworkManagementCode.LOGIN);
pending.put(req.getField(11).toString(), req);
System.out.println("req field 11"+req.getField(11).toString());
log.info(String.format("Sending request %s", req.getField(11)+" : "+req.getObjectValue(11))+" "+new String(req.writeData()));
req.write(sock.getOutputStream(), 0);
Thread.sleep(500);
}}
private static IsoMessage generateNetworkMessage(MessageFactory mfact,int iType){
IsoMessage req = mfact.newMessage(0x800);
req.setValue(ISO8583Fields.NETWORK_MANAGEMENT_INFORMATION_CODE, iType, IsoType.NUMERIC, 3);
req.setIsoHeader(null);
int i = new String(req.writeData()).length();
req.setIsoHeader(String.format("%1$04d", i));
System.out.println("req header "+req.toString());
return req;
}
final class Receiver implements Runnable {
private Socket sock;
private IBaseHandler handler = null;
Receiver(Socket s) {
sock = s;
log.debug("Receiver: sock:"+sock.isConnected()+" Input: "+sock.isInputShutdown()+" Output: "+sock.isOutputShutdown());
}
public void run() {
byte[] lenbuf = new byte[4];
try {
log.debug("Thread started with pending size:"+pending.size() );
// For high volume apps you will be better off only reading the stream in this thread
// and then using another thread to parse the buffers and process the requests
// Otherwise the network buffer might fill up and you can miss a request.
while (sock != null && sock.isConnected()) {// && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) {
if (sock.getInputStream().read(lenbuf) == lenbuf.length) {
StringBuffer sb = new StringBuffer();
for(int i=0;i<lenbuf.length;i++){
System.out.println("lenbuf> "+lenbuf[i]);
System.err.print("lenbuf["+i+"]:"+Integer.parseInt(lenbuf[i]+""));
sb.append((char)Integer.parseInt(lenbuf[i]+""));
}
//int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
int size = Integer.parseInt(sb.toString());
byte[] buf = new byte[size];
//We're not expecting ETX in this case
if (sock.getInputStream().read(buf) == size) {
try {
//We'll use this header length as a reference.
//In practice, ISO headers for any message type are the same length.
//log.debug("new String(buf):"+new String(buf));
IsoMessage resp = mfact.parseMessage(buf,0);
//IsoMessage incoming = mfact.parseMessage(buf, Integer.parseInt((String)serverconfig.get("ISO_HEADER_LENGTH")));
//print(resp);
log.debug("Read response " + resp.getField(11) + " conf " + resp.getField(38) + ": " + new String(buf));
pending.remove(resp.getField(11).toString());
} catch (ParseException ex) {
log.error("Parsing response", ex);
}
} else {
log.debug("Clearing Pending:"+pending.size());
pending.clear();
return;
}
}
}
log.info("Socket is disconnected. While pending has "+pending.size());
} catch (IOException ex) {
log.error("Reading responses", ex);
} finally {
}
}
public void print(IsoMessage m) {
log.debug("TYPE: " + Integer.toHexString(m.getType()));
for (int i = 2; i < 128; i++) {
if (m.hasField(i)) {
log.debug("F " + i + "(" + m.getField(i).getType() + "): " + m.getObjectValue(i) + " -> '"+ m.getField(i).toString() + "'");
}
}
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!