winforms 如何从Interactive Brokers(IB)C#示例应用程序获取多个历史数据请求,(线程UI问题,后台线程)

kzmpq1sx  于 2023-05-07  发布在  C#
关注(0)|答案(1)|浏览(181)

我认为这个问题需要了解Interactive Broker随API一起提供的C#示例应用程序是如何工作的。我试图修改IB的示例C#应用程序,该应用程序随API一起提供,以请求多个历史数据系列,但我遇到了多个问题。似乎我需要让程序等到当前的数据请求完成后再提交第二个,第三个。等等等等。如果我添加以下代码(假设我添加了一个按钮来启动该过程),第二个请求似乎“运行”第一个请求,扰乱了应用程序中每个系列生成的图表。即数据合并到一个图表中。此外,过多的请求导致IB开始拒绝数据请求的竞争条件。本例中的代码为:

private void button7_Click(object sender, EventArgs e)
{
    if (IsConnected)
    {
        srInputTickerFile = new StreamReader(stInputTickerFileName);
        if (!inputTickerFileOpenFlag)
        {
            srInputTickerFile = new StreamReader(stInputTickerFileName);
            inputTickerFileOpenFlag = true;
        }

        while (srInputTickerFile.Peek() >= 0)
        {
            String line = srInputTickerFile.ReadLine();
            String[] lineSplit = line.Split(',');
            string ticker = lineSplit[0];
            string timeDateTo = lineSplit[1];
            string myDuration = lineSplit[2];
            string myBarSize = lineSplit[3];
            string myWhatToShow = lineSplit[4];
            int myRTH = Int32.Parse(lineSplit[5]);
            Contract contract = GetMDContractForHistorical(line);
            contract.ConId = histCounter + 1;
            string endTime = hdRequest_EndTime.Text.Trim();
            string duration = hdRequest_Duration.Text.Trim() + " " +
                hdRequest_TimeUnit.Text.Trim();
            string barSize = hdRequest_BarSize.Text.Trim();
            string whatToShow = hdRequest_WhatToShow.Text.Trim();
            int outsideRTH = contractMDRTH.Checked ? 1 : 0;
            historicalDataManager.AddRequest(contract, timeDateTo, myDuration, 
            myBarSize, myWhatToShow, myRTH, 1, cbKeepUpToDate.Checked);
            historicalDataTab.Text = Utils.ContractToString(contract) + " (HD)";
            ShowTab(marketData_MDT, historicalDataTab);
        }
        srInputTickerFile.Close();
    }
}

我也尝试过其他方法,比如在函数调用后添加

historicalDataManager.AddRequest

下面的代码,以便它可以处理请求。

System.Threading.Thread.Sleep(3000);

这里的问题似乎是应用程序将控制权传递回UI线程来处理图形,从而阻止了图表的显示。
如果我尝试使用后台工作线程,并将代码的“while”部分放入其中。也就是说

new Thread(() =>
{
    Thread.CurrentThread.IsBackground = true;
    //while code above goes here
}).Start();

此代码在historicalDataManager.AddRequest处给予错误,因为它驻留在UI线程上,而不是工作线程上。如果我在while循环中添加代码来更新应用程序中的文本框,它也会生成错误,因为它们驻留在UI线程中。
所以我想可以归结为:
1.如何正确地节流数据请求。
1.如果使用后台工作线程,如何访问IB应用程序中UI线程上的函数调用。

i7uaboj4

i7uaboj41#

所以我想可以归结为:1)如何正确地节流数据请求。2)如果使用后台工作线程,如何访问IB应用中的UI线程上的函数调用。

**1)如何正确地节流数据请求。**这取决于你想走多远。如果你想要一个简单的解决方案,你可以使用...

// On UI thread, non-blocking
Task.Delay(1000).ContinueWith(() => {
    // make request 2
}

为了完全控制所有请求,并保持50/秒的API消息限制和6/min(软)或历史数据请求的短期突发限制,我建议至少2个基于生产者/消费者模型的独立线程。
一个基本的大纲,让你开始如下...(替代上述任务延迟)

// The data class used to queue a request
class Request
{
    public int? reqId;
    public ReqType? reqType, reqFrom;
    public bool snapshot = false, regulatorySnaphsot = false;
    public string? genericTickList;

    // For historical data
    public string? endDateTime, duration, barSize, show;
    public int? useRTH, formatDate;
    public bool update = false;

    public Request(int reqId, ReqType reqType, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
    {
        // Historical data
        this.reqId = reqId;
        this.reqType = reqType;
        this.contract = contract;
        this.endDateTime = endDateTime;
        this.duration = duration;
        this.barSize = barSize;
        this.show = show;
        this.useRTH = useRTH;
        this.formatDate = formatDate;
        this.update = update;
    }
}

/*
    * This class receives all requests, queues them and process with delay to avoid exceeding message rate limitations
    * It also needs to take care of all outstanding requests, so requests for data can be cancelled in one place.
*/
class Requests<T>:IDisposable where T : Request
{
    private readonly IBClient ibClient;
    private readonly object msgLock = new();
    private readonly object historyLock = new();
    private readonly Thread msgThread, contractThread, mktDataThread, historyThread;
    private readonly Queue<Request> msgQueue = new();
    private readonly Queue<Request> historyQueue = new();
    private readonly static AutoResetEvent waitHistory = new(true);
    private readonly Dictionary<int, Request> historyReqs = new();

    public Requests(IBClient ibClient)
    {
        this.ibClient = ibClient;
        msgThread = new Thread(ConsumeMsg);
        msgThread.Start();

        historyThread = new Thread(ConsumeHistory);
        historyThread.Start();
    }

    private void EnqueueMsg(T req)
    {
        lock(msgLock)
        {
            msgQueue.Enqueue(req);
            Monitor.PulseAll(msgLock);
        }
    }
    private void ConsumeMsg()
    {
        /*
            * The message queue does not wait other than ~25ms for rate limitation of messages (50/sec max).
            * Other queues are responsible for limiting request rates depending on request type.
            * We do not increment any counters here, that is done in the respective queue that rate limits requests by type
            * EVERY counter is decremented here!
        */
        while(true)
        {
            Request req;
            lock(msgLock)
            {
                while(msgQueue.Count == 0)
                    Monitor.Wait(msgLock);    // Wait for next Task in queue
                req = msgQueue.Dequeue();
                if(req == null)
                    return;      // This signals our exit
            }

            switch(req.reqType)
            {
                case ReqType.History:
                    ibClient.ClientSocket.reqHistoricalData((int)req.reqId, req.contract, req.endDateTime, req.duration, req.barSize, req.show, req.useRTH ?? 1, req.formatDate ?? 1, req.update, new List<TagValue>());
                    break;

                case ReqType.CancelHistory:
                    ibClient.ClientSocket.cancelHistoricalData((int)req.reqId);
                    historyReqs.Remove((int)req.reqId);
                    break;
            }
            Thread.Sleep(20); // This prevents over 50 msg per second.
        }
    }

    public void HistoricalData(int reqId, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
    {
        EnqueueHistoryRequest((T)new Request(reqId, ReqType.History, contract, endDateTime, duration, barSize, show, useRTH, formatDate, update));
    }
    public void CancelHistoryData(int reqId)
    {
        EnqueueMsg((T)new Request(reqId, ReqType.CancelHistory));
    }
    private void EnqueueHistoryRequest(T req)
    {
        lock(historyLock)
        {
            historyQueue.Enqueue(req);
            Monitor.PulseAll(historyLock);
        }
    }
    private void ConsumeHistory()
    {
        while(true)
        {
            Request req;
            lock(historyLock)
            {
                while(historyQueue.Count == 0)
                    Monitor.Wait(historyLock);      // Wait for next Task in queue
                req = historyQueue.Dequeue();
            }
            if(req == null) return;         // This signals our exit
            

            EnqueueMsg((T)req);
            // We actually have a soft 6/min limit on hist data.
            // This delay does not follow that, we're limiting the number to 50 instead.
            Thread.Sleep(800);
        }
    }
    public void HistDataRecieved(int reqID)
    {
        historyReqs.Remove(reqID);
        if(!waitHistory.SafeWaitHandle.IsClosed && ReqCntHistory < MaxReqHistory)
            waitHistory.Set();     // We can proceed if less than maxHistory requests outstanding
    }
}

要使用,请在UI窗体中

Requests = new(iBclient);
requests.Add(new HistoricalData(......));

2)如果使用后台工作线程,如何访问IB应用中UI线程上的函数调用。

iBclient类在单独的线程上运行,并将事件和数据发送回UI线程。这意味着你的UI线程只在接收到数据的时候才工作,而不需要在UI线程上执行Thread.Sleep或DoEvents。
您可以通过注册如下所示在UI线程上接收这些事件…

private readonly EReaderMonitorSignal signal;
private readonly IBClient ibClient;

partial class MyForm:Form
{
    public MyForm()
    {
        InitializeComponent();
        signal = new EReaderMonitorSignal();
        ibClient = new(signal);

        // Register for events

        // Depending on your API version use one of the following to know when connected.
        ibClient.ConnectedAndReady += Recv_ConnectedAndReady;
        ibClient.NextOrderId += Recv_NextOrderId;
        
        ibClient.Error += Recv_Error;
        ibClient.HistoricalData += Recv_HistoricalData;
        ibClient.HistoricalDataEnd += Recv_HistoricalDataEnd;

        // Connect and start API thread.
        ibClient.ClientId = 0;
        ibClient.ClientSocket.eConnect("127.0.0.1", 7496, 0); // (ClientId = 0) This is the master client that will receive all orders (even from TWS)
        var reader = new EReader(ibClient.ClientSocket, signal);
        reader.Start();

        new Thread(() =>
        {
            while(ibClient.ClientSocket.IsConnected())
            {
                signal.waitForSignal();
                reader.processMsgs();
            }
        })
        { IsBackground = true }.Start();
    }
        
        
    private void Recv_ConnectedAndReady(ConnectionStatusMessage msg)
    {
        // We are now connected and can make requests
        // Older API versions use NextOrderId(int)
        ibClient.ClientSocket.reqHistoricalData(reqId, contract, endDateTime, durationStr, "5 min", "Trades", 0, 0, new List<TagValue>())
    }
    
    private void Recv_Error(int reqId, int errorCode, string? str, string? ordRejectJson, Exception? ex)
    {
        // Possible combinations of parameters
        // reqId, errorCode, str, ex
        //
        // 0,0,null,Exception
        // 0,0,string,null
        // -1,int,string,null
        // int,int,string,null
    }
    
    private void Recv_HistoricalData(HistoricalDataMessage msg)
    {
        //msg.RequestId is the same Id you used to make the request. This is how you can separate data to different charts.
    }
    
    
    private void Recv_HistoricalDataEnd(HistoricalDataEndMessage msg)
    {
        //request complete.
    }
}

相关问题