手把手教你实现一个行情收集器

在程序化交易、量化交易中研究策略、设计策略、回测分析时离不开行情数据的支持。市面上的所有数据都收集也不现实,毕竟数据量太大。对于数字货币市场来说,发明者量化交易平台上支持有限的交易所、交易对的回测数据。如果想回测一些暂时不支持数据的交易所、交易对。可以使用自定义数据源来进行回测,但是这个前提是要自己有数据才行。所以就迫切需要一个行情收集程序,并且能持久化保存,最好还能实时获取。

这样可以解决几个需求,例如:

  • 可以给多个机器人提供数据源,可以缓解每个机器人访问交易所接口的频率。
  • 可以让机器人启动时,获取一个K线BAR数量足够多的K线数据,再也不用担心机器人起始的时候K线BAR数量不足了。
  • 可以收集小币种行情数据,用来给发明者量化交易平台回测系统提供自定义数据源,从而使用回测系统回测策略。
  • 等等..

计划使用python实现,为什么?因为很方便 ?
有了需求,动手!

准备

python的pymongo库

因为要用到数据库,做持久化保存。数据选择使用MongoDB,使用Python语言写收集程序,所以需要这个数据库的驱动库。
在Python上安装pymongo即可。

在托管者所在设备安装MongoDB

例如:MAC安装MongoDB,当然WIN系统安装MongoDB也差不多,网上有很多教程,以在苹果MAC系统安装为例:

下载
下载链接:https://www.mongodb.com/download-center?jmp=nav#community

解压缩
下载后,解压缩到目录:/usr/local

配置环境变量
终端输入:open -e .bash_profile,打开文件后,写入:export PATH=${PATH}:/usr/local/MongoDB/bin
保存后,终端使用source .bash_profile使修改生效。

手动配置数据库文件目录和日志目录
创建目录/usr/local/data/db中对应的文件夹。
创建目录/usr/local/data/logs中对应的文件夹。

编辑配置文件```mongo.conf```:
```
#bind_ip_all = true                  # 任何机器可以连接
bind_ip = 127.0.0.1                  # 本机和192.168.0.3可以访问
port = 27017                         # 实例运行在27017端口(默认)
dbpath = /usr/local/data/db # 数据文件夹存放地址(db要预先创建)
logpath = /usr/local/data/logs/mongodb.log # 日志文件地址
logappend = false                    # 启动时 添加还是重写日志文件
fork = false                         # 是否后台运行
auth = false                         # 开启校验用户
```

运行MongoDB服务

命令:
```
./mongod -f mongo.conf
```

停止服务

```
use admin;
db.shutdownServer();
```

实现收集器程序

收集器以发明者量化交易平台上的Python机器人策略形式运行。由于本人Python水平有限,只是实现了一个简单的例子,用于展示本文的思路。

收集器策略代码:

import pymongo
import json

def main():
    Log("测试数据收集")
    
    # 连接数据库服务
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   # mongodb://127.0.0.1:27017
    # 创建数据库
    huobi_DB = myDBClient["huobi"]
    
    # 打印目前数据库表
    collist = huobi_DB.list_collection_names()
    Log("collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = huobi_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = huobi_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 创建records表
    huobi_DB_Records = huobi_DB["records"]
    
    # 请求数据
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                # 逐根写入
                bar = r[i]
                huobi_DB_Records.insert_one({"index": index, "High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            huobi_DB_Records.insert_one({"index": index, "High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        Sleep(10000)
        

完整策略地址:链接

使用数据

创建使用数据的策略机器人。
注意:需要勾选上「画线类库」,没有的话可以去复制一个到自己策略库。

import pymongo
import json

def main():
    Log("测试使用数据库数据")
    
    # 连接数据库服务
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   # mongodb://127.0.0.1:27017
    # 创建数据库
    huobi_DB = myDBClient["huobi"]
    
    # 打印目前数据库表
    collist = huobi_DB.list_collection_names()
    Log("collist:", collist)
    
    # 查询数据打印
    huobi_DB_Records = huobi_DB["records"]
    
    while True:
        arrRecords = []
        for x in huobi_DB_Records.find():
            bar = {
                "High": x["High"], 
                "Low": x["Low"], 
                "Close": x["Close"], 
                "Open": x["Open"], 
                "Time": x["Time"], 
                "Volume": x["Volume"]
            }
            arrRecords.append(bar)
        
        # 使用画线类库,把取到的K线数据画出来
        ext.PlotRecords(arrRecords, "K")
        LogStatus(_D(), "records length:", len(arrRecords))
        Sleep(10000)

可以看到使用数据的策略机器人代码中没有访问任何交易所接口,通过访问数据库获取数据,行情收集器程序没有记录当前BAR的数据,收集的是已经完成状态的K线BAR,如果需要当前BAR实时数据,稍加修改即可。
当前的例子代码,只是为了演示,在访问数据库中表内的数据记录时是全部获取,这样随着收集数据时间增长,收集数据越来越多,全部查询出来会一定程度上影响性能,可以设计成只查询比当前数据新的数据,添加到当前数据中。

运行

运行托管者程序

在托管者所在设备,运行起来MongoDB数据库服务
./mongod -f mongo.conf

收集器运行,收集发明者量化交易平台的模拟盘wexAppBTC_USDT交易对:
地址:wexApp

使用数据库数据的机器人A:

使用数据库数据的机器人B:

wexApp页面:

图中可以看到,不同ID的机器人,共享使用一个数据源的K线数据。

收集任意周期的K线数据

依托于发明者量化交易平台的强大功能,我们可以轻松实现收集任意周期的K线数据。
比如,我要收集3分钟K线,交易所没有3分钟K线怎么办?没关系,可以轻松实现。

我们修改收集器机器人的配置,K线周期设置为3分钟,发明者量化交易平台会自动合成3分钟K线给收集器程序。

我们使用参数删除表的名称,设置:["records"]删除之前收集的1分钟K线数据表。准备收集3分钟K线数据。

启动收集器程序,再启动使用数据的策略机器人

可以看到画出的K线图表,BAR之间间隔时间就是3分钟了,每根BAR就是3分钟周期的K线柱。下期我们尝试实现自定义数据源的需求实现。感谢阅读

免责声明:信息仅供参考,不构成投资及交易建议。投资者据此操作,风险自担。
如果觉得文章对你有用,请随意赞赏收藏
相关推荐
相关下载
登录后评论
Copyright © 2019 宽客在线