你的位置:首页 > 数据库

[数据库]HBase(二): c#访问HBase之股票行情Demo


     上一章完成了c#访问hbase的sdk封装,接下来以一个具体Demo对sdk进行测试验证。场景:每5秒抓取指定股票列表的实时价格波动行情,数据下载后,一方面实时刷新UI界面,另一方面将数据放入到在内存中模拟的MQ (实际生产情况,可用kafka等集群代替)->存入HBase数据库。提供按指定时间范围股票价格数据查询。

目录:

  • 示例说明
  • 示例效果图
  • rest server运行状态检查
  • 获取股票实时数据代码
  • 数据持续化至Hbase代码
  • 从HBase读取数据代码

示例说明:

  • 在Hbase 中创建两个表,分别为:
  1. StocksInfo (股票信息表,用来存储设置的股票代码、股票名称)
  2. StockRealInfo (股票实时行情数据,包含开盘价、当前价、最高价、最低价、五档竞买、卖单价和数量、成交单价、数量、涨跌幅等)
  • 每5秒钟抓取StocksInfo表中所有股票的数据,自动更新UI,持续化到HBase;支持增加、删除要监控的股票列表。
  • 提供按指定时间范围从hbase中查询历史数据

示例效果图:

  • 历史数据查询:

 

rest server运行状态检查:

  • 在 HDP2.4安装(五):集群及组件安装 章节,Hbase 主机安装在 hdp4 192.168.2.21 上,使用xshell 工具连接到hbase master(hdp4)
  • 查看8080端口是否正常,也可从 ambari UI 界面查看HBase状态,如图:

获取股票实时数据代码:

  •  好多的网站提供股票实时交易数据的下载,我选择的是从 hq.sina 下载,注意抓取数据的频度不要设置的太高,否则你的IP可能会被封掉,代码如下:
    public class SnatchFormSina   {    #region SnatchFormSina    HttpClient client;    private const string dataurl = "http://hq.sinajs.cn/list={0}";    public SnatchFormSina()    {      this.client = new HttpClient();    }    /// <summary>    ///     /// </summary>    public static SnatchFormSina Current    {      get {        return new SnatchFormSina();      }    }    #endregion    #region GetCurrentInfos    /// <summary>    ///     /// </summary>    /// <param name="stockIDs"></param>    /// <returns></returns>    public async Task<List<StockRealInfo>> GetCurrentInfosAsync(List<string> stockIDs)    {      List<StockRealInfo> list = new List<StockRealInfo>();      string dataUrl = this.ParseStockIDs(stockIDs);      dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);      string realInfo = await this.client.GetStringAsync(dataUrl);      string[] infos = realInfo.Split('\n');      StockRealInfo stockInfo;      foreach (string info in infos)      {        if (string.IsNullOrEmpty(info))          continue;        stockInfo = new StockRealInfo(info);        stockInfo.ID = SimulatorCache.StockAccount[stockInfo.Name];        SimulatorCache.StockInfos[stockInfo.ID] = stockInfo;        list.Add(stockInfo);      }      return list;    }    #endregion    #region ParseStockIDs    /// <summary>    ///     /// </summary>    /// <param name="stockIDs"></param>    /// <returns></returns>    private string ParseStockIDs(List<string> stockIDs)    {      StringBuilder sb = new StringBuilder();      foreach(string id in stockIDs)      {         if (id.Substring(0, 2) == "60")//上海是600打头        {          sb.Append(string.Format("sh{0},", id));        }        else if (id.Substring(0, 2) == "51")//上海基金        {          sb.Append(string.Format("sh{0},", id));        }        else //if (stockIDs.Substring(0, 2) == "00")//深圳        {          sb.Append(string.Format("sz{0},", id));        }      }      sb[sb.Length - 1].ToString().Replace(",", "");      return string.Format(dataurl, sb.ToString());    }    #endregion    #region ValiateStockID    /// <summary>    ///     /// </summary>    /// <param name="stockIDs"></param>    /// <returns></returns>    public async Task<string> ValiateStockID(string stockID)    {      string name = string.Empty;      string dataUrl = this.ParseStockIDs(new List<string> { stockID });      dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);      string realInfo = await this.client.GetStringAsync(dataUrl);      string[] infos = realInfo.Split('\n');      StockRealInfo stockInfo;      foreach (string info in infos)      {        if (string.IsNullOrEmpty(info))          continue;        stockInfo = new StockRealInfo(info);        name = stockInfo.Name;      }      return name;    }    #endregion  }

    View Code

数据持续化到Hbase代码示例:

  • 代码中Utils.HBaseClient 是在一个工具类里面创建一个HBaseClient实例
    public class StockRealWriter  {    #region StockRealWriter    Queue<StockRealInfo> queue = new Queue<StockRealInfo>();    // use multithread write    Thread writerThread;    bool threadRunning = true;    const string HBASESTOCKTBLNAME = "StockRealInfo";    public StockRealWriter()    {      // Start a thread for writting to HBase      Task task = new Task(WriterThreadFunction);      task.Start();    }    ~StockRealWriter()    {      threadRunning = false;    }    #endregion    #region WriterThreadFunction    /// <summary>    /// WriterThreadFunction    /// </summary>    public void WriterThreadFunction()    {      while (threadRunning)      {        if (queue.Count > 0)        {          lock (queue)          {            CellSet set = new CellSet();            do            {              StockRealInfo stock = queue.Dequeue();              this.CreateStockByRealInfos(set, stock);            } while (queue.Count > 0);            Utils.HBaseClient.StoreCellsAsync(HBASESTOCKTBLNAME, set);          }        }        Thread.Sleep(5000);      }    }    #endregion    #region CreateStockByRealInfos    /// <summary>    ///     /// </summary>    /// <param name="set"></param>    /// <param name="info"></param>    private void CreateStockByRealInfos(CellSet set, StockRealInfo info)    {      string key = string.Format("{0}_{1}_{2}", info.ID, info.Date, info.Time);      var row = new CellSet.Row { key = Encoding.UTF8.GetBytes(key) };            var value = new Cell { column = Encoding.UTF8.GetBytes("d:ID"), data = Encoding.UTF8.GetBytes(info.ID) };      row.values.Add(value);      value = new Cell { column = Encoding.UTF8.GetBytes("d:Name"), data = Encoding.UTF8.GetBytes(info.Name) };      row.values.Add(value);      //今日开盘价      value = new Cell { column = Encoding.UTF8.GetBytes("d:TodayOpen"), data = Encoding.UTF8.GetBytes(info.TodayOpen) };      row.values.Add(value);      //昨日收盘价      value = new Cell { column = Encoding.UTF8.GetBytes("d:YesterdayClose"), data = Encoding.UTF8.GetBytes(info.YesterdayClose) };      row.values.Add(value);      //当前价格      value = new Cell { column = Encoding.UTF8.GetBytes("d:Current"), data = Encoding.UTF8.GetBytes(info.Current) };      row.values.Add(value);      //今日最高价      value = new Cell { column = Encoding.UTF8.GetBytes("d:High"), data = Encoding.UTF8.GetBytes(info.High) };      row.values.Add(value);      //今日最低价      value = new Cell { column = Encoding.UTF8.GetBytes("d:Low"), data = Encoding.UTF8.GetBytes(info.Low) };      row.values.Add(value);      //竟买价 买1      value = new Cell { column = Encoding.UTF8.GetBytes("d:Buy"), data = Encoding.UTF8.GetBytes(info.Buy) };      row.values.Add(value);      //竟卖价 卖1      value = new Cell { column = Encoding.UTF8.GetBytes("d:Sell"), data = Encoding.UTF8.GetBytes(info.Sell) };      row.values.Add(value);      // 成交数 单位股数 通常除于100成为手      value = new Cell { column = Encoding.UTF8.GetBytes("d:VolAmount"), data = Encoding.UTF8.GetBytes(info.VolAmount) };      row.values.Add(value);      // 成交多少钱,单位元      value = new Cell { column = Encoding.UTF8.GetBytes("d:VolMoney"), data = Encoding.UTF8.GetBytes(info.VolMoney) };      row.values.Add(value);      // 日期      value = new Cell { column = Encoding.UTF8.GetBytes("d:Date"), data = Encoding.UTF8.GetBytes(info.Date) };      row.values.Add(value);      // 时间       value = new Cell { column = Encoding.UTF8.GetBytes("d:Time"), data = Encoding.UTF8.GetBytes(info.Time) };      row.values.Add(value);      // 差额      value = new Cell { column = Encoding.UTF8.GetBytes("d:Diff"), data = Encoding.UTF8.GetBytes(info.Diff) };      row.values.Add(value);      // 百分比      value = new Cell { column = Encoding.UTF8.GetBytes("d:DiffPrec"), data = Encoding.UTF8.GetBytes(info.DiffPrec) };      row.values.Add(value);      DataRow buyInfo;      for(int i=0;i<5;i++)      {        buyInfo = info.BuyList.Rows[i];                value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price0{0}",i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Price"])) };        row.values.Add(value);        value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount0{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Amount"])) };        row.values.Add(value);      }      DataRow sellInfo;      for (int i = 0; i < 5; i++)      {        sellInfo = info.SellList.Rows[i];        value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Price"])) };        row.values.Add(value);        value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Amount"])) };        row.values.Add(value);      }            set.rows.Add(row);    }    #endregion    #region WriteStock    /// <summary>    ///     /// </summary>    /// <param name="stockInfo"></param>    public void WriteStock(List<StockRealInfo> stockInfos)    {      lock (queue)      {        foreach(var stockInfo in stockInfos)        {          queue.Enqueue(stockInfo);        }      }    }    #endregion  }

    View Code

从HBase读取数据代码:

  • 代码中 Scanner 参数是指设置的查询范围 (设置StartRow、EndRow、Batch等参数)
    public class StockRealReader  {    #region StockRealReader    const string HBASESTOCKTBLNAME = "StockRealInfo";    public StockRealReader()    {    }    #endregion    #region QueryStockRealAsync    public async Task<List<StockRealInfo>> QueryStockRealAsync(Scanner query)    {      List<StockRealInfo> list = new List<StockRealInfo>();            ScannerInformation info = await Utils.HBaseClient.CreateScannerAsync(HBASESTOCKTBLNAME, query);      CellSet next;      while ((next = await Utils.HBaseClient.ScannerGetNextAsync(info)) != null)      {        StockRealInfo realInfo;        foreach (CellSet.Row row in next.rows)        {          realInfo = new StockRealInfo();          //开盘价          var temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:TodayOpen");          realInfo.TodayOpen = Encoding.UTF8.GetString(temp.data);          //昨日收盘价          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:YesterdayClose");          realInfo.YesterdayClose = Encoding.UTF8.GetString(temp.data);          //当前价格          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Current");          realInfo.Current = Encoding.UTF8.GetString(temp.data);          //今日最高价          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:High");          realInfo.High = Encoding.UTF8.GetString(temp.data);          //今日最低价          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Low");          realInfo.Low = Encoding.UTF8.GetString(temp.data);          //竟买价 买1          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Buy");          realInfo.Buy = Encoding.UTF8.GetString(temp.data);          //竟卖价 卖1          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Sell");          realInfo.Sell = Encoding.UTF8.GetString(temp.data);          //成交数 单位股数 通常除于100成为手          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolAmount");          realInfo.VolAmount = Encoding.UTF8.GetString(temp.data);          //成交多少钱,单位元          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolMoney");          realInfo.VolMoney = Encoding.UTF8.GetString(temp.data);          //日期          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Date");          realInfo.Date = Encoding.UTF8.GetString(temp.data);          //时间          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Time");          realInfo.Time = Encoding.UTF8.GetString(temp.data);          //差额          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Diff");          realInfo.Diff = Encoding.UTF8.GetString(temp.data);          //百分比          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:DiffPrec");          realInfo.DiffPrec = Encoding.UTF8.GetString(temp.data);          list.Add(realInfo);                }      }      return list;    }    #endregion  }

    View Code