博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
程序员的量化交易之路(37)--Lean之DataStream数据流5
阅读量:5948 次
发布时间:2019-06-19

本文共 10110 字,大约阅读时间需要 33 分钟。

转载需注明出处:,

我们之前说明了数据读者,数据槽。将数据读取到队列中,在算法主线程中需要使用DataFeed线程的数据。这是一个典型的读者-写着问题。

在主线程中和DataFeed打教导的事DataStream。下面我们看它的代码。说明在注释中说明了。

/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org /licenses/LICENSE-2.0 *  * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/using System;using System.Collections.Generic;using System.Threading;using System.Diagnostics;using System.Linq;using QuantConnect.Data;using QuantConnect.Lean.Engine.DataFeeds;using QuantConnect.Logging;namespace QuantConnect.Lean.Engine{    /********************************************************     * QUANTCONNECT NAMESPACES    *********************************************************/    ///     /// Data stream class takes a datafeed hander and converts it into a synchronized enumerable data format for looping     /// 数据流类,拥有数据槽句柄,然后将数据转化为同步可枚举的,用作循环。    /// in the primary algorithm thread.    ///     public static class DataStream    {        /********************************************************         * CLASS VARIABLES        *********************************************************/        //Count of bridges and subscriptions.        //订阅数,就是addSecurity添加的证券数量        private static int _subscriptions;        /********************************************************         * CLASS PROPERTIES        *********************************************************/        ///         /// The frontier time of the data stream        ///         ///         public static DateTime AlgorithmTime { get; private set; }        /********************************************************         * CLASS METHODS        *********************************************************/        ///         /// Process over the datafeed cross thread bridges to generate an enumerable sorted collection of the data, ready for a consumer        /// to use and already synchronized in time.        ///         /// DataFeed object        /// Starting date for the data feed        /// 
public static IEnumerable
>> GetData(IDataFeed feed, DateTime frontierOrigin) { //Initialize: _subscriptions = feed.Subscriptions.Count;//订阅的证券数量 AlgorithmTime = frontierOrigin;//算法前置时间 long algorithmTime = AlgorithmTime.Ticks;//获取时间tick var frontier = frontierOrigin; var nextEmitTime = DateTime.MinValue; var periods = feed.Subscriptions.Select(x => x.Resolution.ToTimeSpan()).ToArray(); //Wait for datafeeds to be ready, wait for first data to arrive: while (feed.Bridge.Length != _subscriptions) Thread.Sleep(100); // clear data first when in live mode, start with fresh data if (Engine.LiveMode) { feed.PurgeData(); } //Get all data in queues: return as a sorted dictionary: //获取队列中的所有数据,以排序好的字典返回 while (!feed.EndOfBridges) { //Reset items which are not fill forward: long earlyBirdTicks = 0; var newData = new Dictionary
>(); // spin wait until the feed catches up to our frontier WaitForDataOrEndOfBridges(feed, frontier); for (var i = 0; i < _subscriptions; i++) { //If there's data on the bridge, check to see if it's time to pull it off, if it's in the future // we'll record the time as 'earlyBirdTicks' so we can fast forward the frontier time while (!feed.Bridge[i].IsEmpty)//第i个证券中有数据 { //Look at first item on list, leave it there until time passes this item. //检查链表中第一个条目 List
result; if (!feed.Bridge[i].TryPeek(out result)) { // if there's no item skip to the next subscription //如果这个证券没有数据,那么就直接跳到下一个证券 break; } if (result.Count > 0 && result[0].EndTime > frontier) { // we have at least one item, check to see if its in ahead of the frontier, // if so, keep track of how many ticks in the future it is if (earlyBirdTicks == 0 || earlyBirdTicks > result[0].EndTime.Ticks) { earlyBirdTicks = result[0].EndTime.Ticks; } break; } if (result.Count > 0) { // we have at least one item, check to see if its in ahead of the frontier, // if so, keep track of how many ticks in the future it is if (earlyBirdTicks == 0 || earlyBirdTicks > result[0].EndTime.Ticks) { earlyBirdTicks = result[0].EndTime.Ticks; } } //Pull a grouped time list out of the bridge List
dataPoints; if (feed.Bridge[i].TryDequeue(out dataPoints)) { // round the time down based on the requested resolution for fill forward data // this is a temporary fix, long term fill forward logic should be moved into this class foreach (var point in dataPoints) { if (algorithmTime < point.EndTime.Ticks) { // set this to most advanced end point in time, pre rounding // min 10:02 10:02:01(FF) 10:02:01.01(FF) // sec 10:02 10:02:01 10:02:01.01(FF) // tic 10:02 10:02:01 10:02:01.01 // the algorithm time should always be the 'frontier' the furthest // time within this data slice algorithmTime = point.EndTime.Ticks; } if (point.IsFillForward) { point.Time = point.Time.RoundDown(periods[i]); } } // add the list to the collection to be yielded List
dp; if (!newData.TryGetValue(i, out dp)) { dp = new List
(); newData[i] = dp; } dp.AddRange(dataPoints); } else { //Should never fail: Log.Error("DataStream.GetData(): Failed to dequeue bridge item"); } } } if (newData.Count > 0) { AlgorithmTime = new DateTime(algorithmTime); yield return newData; } //Update the frontier and start again. if (earlyBirdTicks > 0) { //Seek forward in time to next data event from stream: there's nothing here for us to do now: why loop over empty seconds frontier = new DateTime(earlyBirdTicks); } else if (feed.EndOfBridges) { // we're out of data or quit break; } //Allow loop pass through emits every second to allow event handling (liquidate/stop/ect...) if (Engine.LiveMode && DateTime.Now > nextEmitTime) { AlgorithmTime = DateTime.Now.RoundDown(periods.Min()); nextEmitTime = DateTime.Now + TimeSpan.FromSeconds(1); yield return new Dictionary
>(); } } Log.Trace("DataStream.GetData(): All Streams Completed."); } ///
/// Waits until the data feed is ready for the data stream to pull data from it. /// 等待数据槽准备好了,可以从里面拉取数据 /// ///
The IDataFeed instance populating the bridges ///
The frontier of the data stream private static void WaitForDataOrEndOfBridges(IDataFeed feed, DateTime dataStreamFrontier) { //Make sure all bridges have data to to peek sync properly. var now = Stopwatch.StartNew(); // timeout to prevent infinite looping here -- 50ms for live and 30sec for non-live var loopTimeout = (Engine.LiveMode) ? 50 : 30000; if (Engine.LiveMode) { // give some time to the other threads in live mode Thread.Sleep(1); } //Waiting for data in the bridges: while (!AllBridgesHaveData(feed) && now.ElapsedMilliseconds < loopTimeout) { Thread.Sleep(1); } //we want to verify that our data stream is never ahead of our data feed. //this acts as a virtual lock around the bridge so we can wait for the feed //to be ahead of us // if we're out of data then the feed will never update (it will stay here forever if there's no more data, so use a timeout!!) while (dataStreamFrontier > feed.LoadedDataFrontier && (!feed.EndOfBridges && !feed.LoadingComplete) && now.ElapsedMilliseconds < loopTimeout) { Thread.Sleep(1); } } ///
/// Check if all the bridges have data or are dead before starting the analysis /// /// This determines whether or not the data stream can pull data from the data feed. /// ///
Feed Interface with concurrent connection between producer and consumer ///
Boolean true more data to download
private static bool AllBridgesHaveData(IDataFeed feed) { //Lock on the bridge to scan if it has data: for (var i = 0; i < _subscriptions; i++) { if (feed.EndOfBridge[i]) continue; if (feed.Bridge[i].IsEmpty) { return false; } } return true; } ///
/// Resets the frontier time to DateTime.MinValue /// public static void ResetFrontier() { AlgorithmTime = new DateTime(); } }}

你可能感兴趣的文章
从CALSSPATH加载properties文件
查看>>
asp.net GridView激发了未处理的事件“PageIndexChanging”的分析
查看>>
MPLS
查看>>
tar.xz文件如何解压
查看>>
jquery 给textarea赋值,firefox下出现[object XMLDocument]
查看>>
我的友情链接
查看>>
2014年首届CCF软件能力认证试题 题目二
查看>>
c语言数据类型汇总
查看>>
部署Exchange 2010(二)首台MailBox服务器
查看>>
用好这块风水宝地--开博之语
查看>>
Jquery插件开发
查看>>
产品配置管理相关通知
查看>>
数据库相关算法 之 xxHash
查看>>
牛顿法/递归法实现开方的函数功能
查看>>
手势模型和Angular Material的实现
查看>>
Java转换txt文件编码(GBK转UTF-8)
查看>>
Redis-使用记录
查看>>
APICloud 微信授权登录
查看>>
Java Web开发自学笔记一:环境(tomcat、jdk的选择和安装)
查看>>
Cakephp requestAction用法
查看>>