
brtdzjyr  于 2021-07-09  发布在  Java


int numBytes = socketChannel.write(_send_buffer);

哪里 socketChannel 是java.nio.channels.socketchannel的示例
以及 _send_buffer 是java.nio.bytebuffer的示例
代码通过非阻塞选择器写入操作到达这一点,当 _send_buffer 它很大。我对代码没有任何问题 _send_buffer 小于20mb,但当尝试使用更大的缓冲区(例如>100mb)进行测试时失败。
我的通道应该设置为非阻塞的,所以我认为写操作应该只尝试写入套接字输出缓冲区的容量。因为我之前没有指定这个,所以我尝试通过setoption方法和sou sndbuf选项将它设置为1024字节。即:

socketChannel.setOption(SO_SNDBUF, 1024);


2021-04-22 11:52:44.260 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Clamp target GC heap from 195MB to 192MB
2021-04-22 11:52:44.260 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Alloc concurrent copying GC freed 2508(64KB) AllocSpace objects, 0(0B) LOS objects, 10% free, 171MB/192MB, paused 27us total 12.714ms
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning W/.serverLearnin: Throwing OutOfMemoryError "Failed to allocate a 49915610 byte allocation with 21279560 free bytes and 20MB until OOM, target footprint 201326592, growth limit 201326592" (VmSize 5585608 kB)
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Starting a blocking GC Alloc
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Starting a blocking GC Alloc

现在我可以内联调试并在写行停止,并且没有崩溃,所以我相信处理的内存需求没有问题 _send_buffer 但是当试图写的时候,后台的一些东西正在创建另一个分配,这个分配太多了。
也许我想的不对,需要限制我的生活 _send_buffer 我认为应该有一种方法来限制write命令所做的分配no?或者至少可以通过某种方式为我的应用程序分配更多的android内存。我用的是一个像素3a,根据规格它应该有4gb的内存。现在我意识到这必须与系统的其他部分共享,但这是一个基本的测试设备(没有安装游戏、个人应用程序等),所以我假设我应该可以访问相当大一部分4gb。由于我的增长极限是201326592(根据上面的logcat),所以我觉得奇怪的是,我的增长极限是规范内存的0.2/4.0=5%。


根据注解的请求添加一些代码上下文。注意这不是一个可运行的示例,因为代码库非常大,而且由于公司的策略,我不允许共享所有代码。请注意 _send_buffer is与socketchannel本身的sendbuffer无关(即getsendbuffersize引用的内容,它只是一个bytebuffer,在通过通道发送之前,我使用它将所有内容捆绑在一起。因为我不能共享所有与生成 _send_buffer 请注意,它是一个bytebuffer,可以非常大(>100mb)。如果这是一个根本的问题,那么请指出这一点和原因。


注意 sendMsgToServer 方法被重写(没有修改)并从主android活动(未显示)调用。这个 byte[] episode arg是被 Package 成 (下一节)以后会放进 _send_buffer 中的示例 write 方法 .

package jp.oist.abcvlib.util;

import android.util.Log;

import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;

import static;

public class SocketConnectionManager implements Runnable{

    private SocketChannel sc;
    private Selector selector;
    private SocketListener socketListener;
    private final String TAG = "SocketConnectionManager";
    private SocketMessage socketMessage;
    private final String serverIp;
    private final int serverPort;

    public SocketConnectionManager(SocketListener socketListener, String serverIp, int serverPort){
        this.socketListener = socketListener;
        this.serverIp = serverIp;
        this.serverPort = serverPort;

    public void run() {
        try {
            selector =;
            start_connection(serverIp, serverPort);
            do {
                int eventCount =;
                Set<SelectionKey> events = selector.selectedKeys(); // events is int representing how many keys have changed state
                if (eventCount != 0){
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    for (SelectionKey selectedKey : selectedKeys){
                            SocketMessage socketMessage = (SocketMessage) selectedKey.attachment();
                        }catch (ClassCastException e){
                            Log.e(TAG,"Error", e);
                            Log.e(TAG, "selectedKey attachment not a SocketMessage type");
            } while (selector.isOpen()); //todo remember to close the selector somewhere

        } catch (IOException e) {
            Log.e(TAG,"Error", e);

    private void start_connection(String serverIp, int serverPort){
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
            sc =;
            sc.setOption(SO_SNDBUF, 1024);
            socketMessage = new SocketMessage(socketListener, sc, selector);

            Log.v(TAG, "registering with selector to connect");
            int ops = SelectionKey.OP_CONNECT;
            sc.register(selector, ops, socketMessage);

            Log.d(TAG, "Initializing connection with " + inetSocketAddress);
            boolean connected = sc.connect(inetSocketAddress);
            Log.v(TAG, "socketChannel.isConnected ? : " + sc.isConnected());

        } catch (IOException | ClosedSelectorException | IllegalBlockingModeException
                | CancelledKeyException | IllegalArgumentException e) {
            Log.e(TAG, "Initial socket connect and registration:", e);

    public void sendMsgToServer(byte[] episode){
        boolean writeSuccess = socketMessage.addEpisodeToWriteBuffer(episode);

     * Should be called prior to exiting app to ensure zombie threads don't remain in memory.
    public void close(){
        try {
            Log.v(TAG, "Closing connection: " + sc.getRemoteAddress());
        } catch (IOException e) {
            Log.e(TAG,"Error", e);


这从这里给出的示例python代码中得到了很大的启发,特别是 以及 . 这是因为服务器运行的是python代码,而客户机运行的是java。因此,如果您想了解为什么事情是这样的,请参考realpythonsocket教程。我基本上使用app-server.py作为代码的模板,并为客户机翻译(经过修改)为java。

package jp.oist.abcvlib.util;

import android.util.Log;

import org.json.JSONException;
import org.json.JSONObject;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.util.Vector;

public class SocketMessage {

    private final SocketChannel sc;
    private final Selector selector;
    private final ByteBuffer _recv_buffer;
    private ByteBuffer _send_buffer;
    private int _jsonheader_len = 0;
    private JSONObject jsonHeaderRead; // Will tell Java at which points in msgContent each model lies (e.g. model1 is from 0 to 1018, model2 is from 1019 to 2034, etc.)
    private byte[] jsonHeaderBytes;
    private ByteBuffer msgContent; // Should contain ALL model files. Parse to individual files after reading
    private final Vector<ByteBuffer> writeBufferVector = new Vector<>(); // List of episodes
    private final String TAG = "SocketConnectionManager";
    private JSONObject jsonHeaderWrite;
    private boolean msgReadComplete = false;
    private SocketListener socketListener;
    private long socketWriteTimeStart;
    private long socketReadTimeStart;

    public SocketMessage(SocketListener socketListener, SocketChannel sc, Selector selector){
        this.socketListener = socketListener; = sc;
        this.selector = selector;
        this._recv_buffer = ByteBuffer.allocate(1024);
        this._send_buffer = ByteBuffer.allocate(1024);

    public void process_events(SelectionKey selectionKey){
        SocketChannel sc = (SocketChannel);
//        Log.i(TAG, "process_events");
            if (selectionKey.isConnectable()){
                Log.d(TAG, "Finished connecting to " + ((SocketChannel);
                Log.v(TAG, "socketChannel.isConnected ? : " + sc.isConnected());

            if (selectionKey.isWritable()){
//                Log.i(TAG, "write event");
            if (selectionKey.isReadable()){
//                Log.i(TAG, "read event");
//                int ops = SelectionKey.OP_WRITE;
//                sc.register(selectionKey.selector(), ops, selectionKey.attachment());

        } catch (ClassCastException | IOException | JSONException e){
            Log.e(TAG,"Error", e);

    private void read(SelectionKey selectionKey) throws IOException, JSONException {

        SocketChannel socketChannel = (SocketChannel);

            // At this point the _recv_buffer should have been cleared (pointer 0 limit=cap, no mark)
            int bitsRead =;

            if (bitsRead > 0 || _recv_buffer.position() > 0){
                if (bitsRead > 0){
//                    Log.v(TAG, "Read " + bitsRead + " bytes from " + socketChannel.getRemoteAddress());

                // If you have not determined the length of the header via the 2 byte short protoheader,
                // try to determine it, though there is no gaurantee it will have enough bytes. So it may
                // pass through this if statement multiple times. Only after it has been read will
                // _jsonheader_len have a non-zero length;
                if (this._jsonheader_len == 0){
                    socketReadTimeStart = System.nanoTime();
                // _jsonheader_len will only be larger than 0 if set properly (finished being set).
                // jsonHeaderRead will be null until the buffer gathering it has filled and converted it to
                // a JSONobject.
                else if (this.jsonHeaderRead == null){
                else if (!msgReadComplete){
                } else {
                    Log.e(TAG, "bitsRead but don't know what to do with them");

    private void write(SelectionKey selectionKey) throws IOException, JSONException {

        if (!writeBufferVector.isEmpty()){
            SocketChannel socketChannel = (SocketChannel);

            Log.v(TAG, "writeBufferVector contains data");

            if (jsonHeaderWrite == null){
                int numBytesToWrite = writeBufferVector.get(0).limit();

                // Create JSONHeader containing length of episode in Bytes
                Log.v(TAG, "generating jsonheader");
                jsonHeaderWrite = generate_jsonheader(numBytesToWrite);
                byte[] jsonBytes = jsonHeaderWrite.toString().getBytes(StandardCharsets.UTF_8);

                // Encode length of JSONHeader to first two bytes and write to socketChannel
                int jsonLength = jsonBytes.length;

                // Add up length of protoHeader, JSONheader and episode bytes
                int totalNumBytesToWrite = Integer.BYTES + jsonLength + numBytesToWrite;

                // Create new buffer that compiles protoHeader, JsonHeader, and Episode
                _send_buffer = ByteBuffer.allocate(totalNumBytesToWrite);

                Log.v(TAG, "Assembling _send_buffer");
                // Assemble all bytes and flip to prepare to read

                Log.d(TAG, "Writing to server ...");

                // Write Bytes to socketChannel //todo shouldn't be while as should be non-blocking
                if (_send_buffer.remaining() > 0){
                    int numBytes = socketChannel.write(_send_buffer); // todo memory dump error here!
                    int percentDone = (int) Math.ceil((((double) _send_buffer.limit() - (double) _send_buffer.remaining())
                            / (double) _send_buffer.limit()) * 100);
                    int total = _send_buffer.limit() / 1000000;
//                    Log.d(TAG, "Sent " + percentDone + "% of " + total + "Mb to " + socketChannel.getRemoteAddress());
            } else{
                // Write Bytes to socketChannel
                if (_send_buffer.remaining() > 0){
            if (_send_buffer.remaining() == 0){
                int total = _send_buffer.limit() / 1000000;
                double timeTaken = (System.nanoTime() - socketWriteTimeStart) * 10e-10;
                DecimalFormat df = new DecimalFormat();
                Log.i(TAG, "Sent " + total + "Mb in " + df.format(timeTaken) + "s");
                // Remove episode from buffer so as to not write it again.
                // Clear sending buffer
                // make null so as to catch the initial if statement to write a new one.
                jsonHeaderWrite = null;

                // Set socket to read now that writing has finished.
                Log.d(TAG, "Reading from server ...");
                int ops = SelectionKey.OP_READ;
                sc.register(selectionKey.selector(), ops, selectionKey.attachment());


    private JSONObject generate_jsonheader(int numBytesToWrite) throws JSONException {
        JSONObject jsonHeader = new JSONObject();

        jsonHeader.put("byteorder", ByteOrder.nativeOrder().toString());
        jsonHeader.put("content-length", numBytesToWrite);
        jsonHeader.put("content-type", "flatbuffer"); // todo Change to flatbuffer later
        jsonHeader.put("content-encoding", "flatbuffer"); //Change to flatbuffer later
        return jsonHeader;

     * recv_buffer may contain 0, 1, or several bytes. If it has more than hdrlen, then process
     * the first two bytes to obtain the length of the jsonheader. Else exit this function and
     * read from the buffer again until it fills past length hdrlen.
    private void process_protoheader() {
        Log.v(TAG, "processing protoheader");
        int hdrlen = 2;
        if (_recv_buffer.position() >= hdrlen){
            _recv_buffer.flip(); //pos at 0 and limit set to bitsRead
            _jsonheader_len = _recv_buffer.getShort(); // Read 2 bytes converts to short and move pos to 2
            // allocate new ByteBuffer to store full jsonheader
            jsonHeaderBytes = new byte[_jsonheader_len];


            Log.v(TAG, "finished processing protoheader");

     *  As with the process_protoheader we will check if _recv_buffer contains enough bytes to
     *  generate the jsonHeader objects, and if not, leave it alone and read more from socket.
    private void process_jsonheader() throws JSONException {

        Log.v(TAG, "processing jsonheader");

        // If you have enough bytes in the _recv_buffer to write out the jsonHeader
        if (_jsonheader_len - _recv_buffer.position() < 0){
            // jsonheaderBuffer should now be full and ready to convert to a JSONobject
            jsonHeaderRead = new JSONObject(new String(jsonHeaderBytes));
            Log.d(TAG, "JSONheader from server: " + jsonHeaderRead.toString());

                int msgLength = (int) jsonHeaderRead.get("content-length");
                msgContent = ByteBuffer.allocate(msgLength);
            }catch (JSONException e) {
                Log.e(TAG, "Couldn't get content-length from jsonHeader sent from server", e);
        // Else return to selector and read more bytes into the _recv_buffer

        // If there are any bytes left over (part of the msg) then move them to the front of the buffer
        // to prepare for another read from the socket

     * Here a bit different as it may take multiple full _recv_buffers to fill the msgContent.
     * So check if msgContent.remaining is larger than 0 and if so, dump everything from _recv_buffer to it
     * @param selectionKey : Used to reference the instance and selector
     * @throws ClosedChannelException :
    private void process_msgContent(SelectionKey selectionKey) throws IOException {

        if (msgContent.remaining() > 0){
            _recv_buffer.flip(); //pos at 0 and limit set to bitsRead set ready to read

        if (msgContent.remaining() == 0){
            // msgContent should now be full and ready to convert to a various model files.
            socketListener.onServerReadSuccess(jsonHeaderRead, msgContent);

            // Clear for next round of communication
            _jsonheader_len = 0;
            jsonHeaderRead = null;

            int totalBytes = msgContent.capacity() / 1000000;
            double timeTaken = (System.nanoTime() - socketReadTimeStart) * 10e-10;
            DecimalFormat df = new DecimalFormat();
            Log.i(TAG, "Entire message containing " + totalBytes + "Mb recv'd in " + df.format(timeTaken) + "s");

            msgReadComplete = true;

            // Set socket to write now that reading has finished.
            int ops = SelectionKey.OP_WRITE;
            sc.register(selectionKey.selector(), ops, selectionKey.attachment());

    //todo should send this to the mainactivity listener so it can be customized/overridden
    private void onNewMessageFromServer(){
        // Take info from JSONheader to parse msgContent into individual model files

        // After parsing all models notify MainActivity that models have been updated

    // todo should be able deal with ByteBuffer from FlatBuffer rather than byte[]
    public boolean addEpisodeToWriteBuffer(byte[] episode){
        boolean success = false;
            ByteBuffer bb = ByteBuffer.wrap(episode);
            success = writeBufferVector.add(bb);
            Log.v(TAG, "Added data to writeBuffer");
            int ops = SelectionKey.OP_WRITE;
            socketWriteTimeStart = System.nanoTime();
            sc.register(selector, ops, this);
            // I want this to trigger the selector that this channel is writeReady.
        } catch (NullPointerException | ClosedChannelException e){
            Log.e(TAG,"Error", e);
            Log.e(TAG, " not initialized yet");
        return success;


在运行activitymanager.getmemoryclass方法之后,我看到像素3a的硬限制是192mb。当我试图分配刚刚超过200 mb时,我达到了这个限制。
除非其他人知道如何解决这个问题,否则我必须写一些逻辑来分段编写 episode 如果它超过某个点,则归档,然后通过通道分段发送。我想这会让事情慢一点,所以如果有人有一个答案可以避免这一点,或者告诉我,如果做得好,为什么这不会让事情慢下来,那么我很乐意给你答案。只是把这个作为一个答案,因为它确实回答了我原来的问题,但并不令人满意。
