我使用通用池将数据从nodejs tcp客户机发送到javatcp服务器。
这是我的nodejs客户端入口点中的相关代码 blockEventListener.js
-此处显示完整代码:
function createPool() {
const factory = {
create: function() {
return new Promise((resolve, reject) => {
const socket = new net.Socket();
socket.connect({
host: sdkAddress,
port: sdkPort,
});
socket.setKeepAlive(true);
socket.on('connect', () => {
resolve(socket);
});
socket.on('error', error => {
if (error.code === "ECONNREFUSED") {
//console.log(`Retry after ${poolRetry}ms`);
setTimeout(() => {
socket.connect({
host: sdkAddress,
port: sdkPort,
});
}, poolRetry);
} else {
reject(error);
}
});
socket.on('close', hadError => {
console.log(`socket closed: ${hadError}`);
});
});
},
destroy: function(socket) {
return new Promise((resolve) => {
socket.destroy();
resolve();
});
},
validate: function (socket) {
return new Promise((resolve) => {
if (socket.destroyed || !socket.readable || !socket.writable) {
return resolve(false);
} else {
return resolve(true);
}
});
}
};
return genericPool.createPool(factory, {
max: poolMax,
min: poolMin,
maxWaitingClients: poolQueue,
testOnBorrow: true,
acquireTimeoutMillis: queueWait
});
}
const pool = createPool();
const poolStatusReport = `pool.spareResourceCapacity = ${pool.spareResourceCapacity}, pool.available = ${pool.available}, pool.borrowed = ${pool.borrowed}, pool.pending = ${pool.pending}`;
async function processPendingBlocks(ProcessingMap, channelid, configPath) {
setTimeout(async () => {
let nextBlockNumber = fs.readFileSync(configPath, "utf8");
let processBlock;
do {
processBlock = ProcessingMap.get(channelid, nextBlockNumber);
if (processBlock == undefined) {
break;
}
if (pool.spareResourceCapacity == 0 && pool.available == 0) {
break;
}
try {
const sock = await pool.acquire();
await blockProcessing.processBlockEvent(channelid, processBlock, sock, configPath);
await pool.release(sock);
} catch (error) {
console.error(`Failed to process block: ${error}`);
}
ProcessingMap.remove(channelid, nextBlockNumber);
fs.writeFileSync(configPath, parseInt(nextBlockNumber, 10) + 1);
nextBlockNumber = fs.readFileSync(configPath, "utf8");
} while (true);
processPendingBlocks(ProcessingMap, channelid, configPath);
}, blockProcessInterval);
}
这是中的相关代码 blockProcessing.js
-此处显示完整代码:
exports.processBlockEvent = async function (channelname, block, socket, configPath) {
return new Promise(async (resolve, reject) => {
// some code
for (var dataItem in dataArray) {
// more code
for (var actionItem in actions) {
// yet more code
for (var record in rwSet) {
// ignore lscc events
if (rwSet[record].namespace != "lscc") {
// create object to store properties
const writeObject = new Object();
writeObject.blocknumber = blockNumber;
writeObject.chaincodeid = chaincodeID;
writeObject.channelid = channelid;
writeObject.timestamp = timestamp;
writeObject.txnid = txnid;
writeObject.values = rwSet[record].rwset.writes;
writeToSocket(socket, writeObject, channelname, chaincodeID);
}
}
}
console.log("---------");
}
// update the nextblock.nextBlock file to retrieve the next block
fs.writeFileSync(configPath, parseInt(blockNumber, 10) + 1);
socket.write('<END>\n');
resolve(true);
})
}
function writeToSocket(socket, writeObject, channelname, chaincodeID) {
return new Promise(async (resolve, reject) => {
console.log(`ChannelID: ${writeObject.channelid}`);
console.log(`Transaction Timestamp: ${writeObject.timestamp}`);
console.log(`ChaincodeID: ${writeObject.chaincodeid}`);
console.log(`TxnID: ${writeObject.txnid}`);
console.log(writeObject.values);
let objstr = JSON.stringify(writeObject);
socket.on('error', function(ex) {
console.log('!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!');
console.log(ex);
console.log(writeObject);
console.log('=================================');
});
socket.write(objstr);
socket.write('\n');
var outputLog = path.resolve(__dirname, folderLog, `${channelname}_${chaincodeID}.log`);
fs.appendFileSync(outputLog, objstr + "\n");
});
}
这是我的简单java tcp服务器中的代码:
package demo;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class SimpleListener {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(Integer.parseInt(args[0]));
int n = 0;
while (n < Integer.parseInt(args[1])) {
new ThreadSocket(server.accept());
n++;
}
server.close();
}
}
class ThreadSocket extends Thread{
private Socket insocket;
ThreadSocket(Socket insocket){
this.insocket = insocket;
this.start();
}
@Override
public void run() {
try {
InputStream is = insocket.getInputStream();
InputStreamReader reader = new InputStreamReader(is);
BufferedReader in = new BufferedReader(reader);
PrintWriter out = new PrintWriter(insocket.getOutputStream(), true);
StringBuilder sb = new StringBuilder();
String line = in.readLine();
while (line != null && !"<END>".equalsIgnoreCase(line)) {
sb.append(line + "\n");
line = in.readLine();
}
String output = sb.toString();
System.out.println("INCOMING: " + output);
out.println(200);
out.close();
in.close();
reader.close();
is.close();
insocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
当我运行这些程序时,我希望看到客户端读取的所有数据都被完整地发送到服务器。相反,我观察到的类似于这个示例输出。最初,信息是完整的。但是,在前五行之后(我认为这是因为我将池的最小大小设置为5),我开始看到信息中的空白。关键是,我明白了 EPIPE
以及 ERR_STREAM_DESTROYED
在我的客户端输出中:
!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!
Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed
at doWrite (_stream_writable.js:399:19)
at writeOrBuffer (_stream_writable.js:387:5)
at Socket.Writable.write (_stream_writable.js:318:11)
at /root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:103:16
at new Promise (<anonymous>)
at Object.exports.processBlockEvent (/root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:16:12)
at Timeout._onTimeout (/root/nodejslistener/accessreal/product/sdk/nodejs/blockEventListener.js:185:39) {
code: 'ERR_STREAM_DESTROYED'
}
{
blocknumber: '427',
...more details...
}
epipe错误输出示例:
!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!
Error: write EPIPE
at afterWriteDispatched (internal/stream_base_commons.js:156:25)
at writeGeneric (internal/stream_base_commons.js:147:3)
at Socket._writeGeneric (net.js:788:11)
at Socket._write (net.js:800:8)
at doWrite (_stream_writable.js:403:12)
at writeOrBuffer (_stream_writable.js:387:5)
at Socket.Writable.write (_stream_writable.js:318:11)
at /root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:124:16
at new Promise (<anonymous>)
at writeToSocket (/root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:109:12) {
errno: 'EPIPE',
code: 'EPIPE',
syscall: 'write'
}
{
blocknumber: '426',
...more details...
}
关于我目前的处境,我有几个问题:
当我忘记了 .close()
在我的java服务器中,我没有得到任何错误,但我只看到前五个数据块。后续的数据似乎丢失了,但我在我的客户机中没有看到任何错误。为什么?
我应该在nodejs客户机和/或java服务器中更改什么,以便成功地发送所有数据?
暂无答案!
目前还没有任何答案,快来回答吧!