老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
阅读:60回复:7

添加涨跌停价获取功能

楼主#
更多 发布于:2025-12-05 19:04
def get_stock_limit_prices(self, stock_code):
    """获取股票的涨停价和跌停价"""
    try:
        # 方法1: 通过QMT接口获取
        # 注意:需要根据实际QMT接口调整
        try:
            # 假设通过xtdata获取
            from xtquant import xtdata
            quote = xtdata.get_full_tick([stock_code])
            if stock_code in quote:
                tick_data = quote[stock_code]
                if 'upperLimit' in tick_data and 'lowerLimit' in tick_data:
                    upper_limit = tick_data['upperLimit']
                    lower_limit = tick_data['lowerLimit']
                    if upper_limit > 0 and lower_limit > 0:
                        return {'upper': upper_limit, 'lower': lower_limit}
        except ImportError:
            pass
        except Exception as e:
            logging.warning(f"通过xtdata获取涨跌停价失败: {str(e)}")
        
        # 方法2: 根据历史数据计算
        try:
            from xtquant import xtdata
            # 获取前复权日线数据
            market_data = xtdata.get_market_data(
                field_list=['preClose'],
                stock_list=[stock_code],
                period='1d',
                count=1
            )
            if 'preClose' in market_data and len(market_data['preClose']) > 0:
                pre_close = market_data['preClose'][0][0]
                if pre_close > 0:
                    # 判断是否为ST股票(涨跌幅限制为5%)
                    if 'ST' in stock_code or '*ST' in stock_code:
                        limit_rate = 0.05
                    else:
                        limit_rate = 0.10
                    
                    upper_limit = round(pre_close * (1 + limit_rate), 2)
                    lower_limit = round(pre_close * (1 - limit_rate), 2)
                    return {'upper': upper_limit, 'lower': lower_limit}
        except Exception as e:
            logging.warning(f"通过历史数据计算涨跌停价失败: {str(e)}")
        
        # 方法3: 根据股票代码前缀判断(最后的手段)
        # 这里提供一个简单的估算,实际使用时需要更精确的逻辑
        logging.warning(f"无法获取{stock_code}的涨跌停价,使用默认估算")
        return {'upper': 999999, 'lower': 0.01}  # 返回一个宽泛的范围
        
    except Exception as e:
        logging.error(f"获取涨跌停价异常: {str(e)}")
        return {'upper': 999999, 'lower': 0.01}
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
沙发#
发布于:2025-12-05 19:05
修改价格计算函数
def calculate_order_price(self, signal_price, signal_type, stock_code=None):
    """计算委托价格,考虑涨跌停价限制"""
    try:
        if signal_price <= 0:
            logging.error(f"信号价格无效: {signal_price}")
            return signal_price, xtconstant.FIX_PRICE
        
        # 使用市价单的情况
        if self.price_strategy['use_market_price']:
            return 0, xtconstant.LATEST_PRICE
        
        # 限价单,根据信号类型计算价格
        if signal_type == "买入":
            # 买入上浮1%
            base_price = signal_price * (1 + self.price_strategy['buy_premium'])
            
            # 如果提供了股票代码,检查涨停价限制
            if stock_code:
                limit_prices = self.get_stock_limit_prices(stock_code)
                if base_price > limit_prices['upper']:
                    logging.info(f"买入价格{base_price:.2f}超过涨停价{limit_prices['upper']:.2f},调整为涨停价")
                    base_price = limit_prices['upper']
            
            order_price = self.adjust_price_to_tick(base_price)
            
            # 确保价格大于0且小于等于涨停价
            if order_price <= 0:
                logging.warning(f"计算出的买入价格无效: {order_price},使用信号价格")
                order_price = signal_price
            
            return order_price, xtconstant.FIX_PRICE
        
        elif signal_type == "卖出":
            # 卖出下浮1%
            base_price = signal_price * (1 - self.price_strategy['sell_discount'])
            
            # 如果提供了股票代码,检查跌停价限制
            if stock_code:
                limit_prices = self.get_stock_limit_prices(stock_code)
                if base_price < limit_prices['lower']:
                    logging.info(f"卖出价格{base_price:.2f}低于跌停价{limit_prices['lower']:.2f},调整为跌停价")
                    base_price = limit_prices['lower']
            
            order_price = self.adjust_price_to_tick(base_price)
            
            # 确保价格大于0且大于等于跌停价
            if order_price <= 0:
                logging.warning(f"计算出的卖出价格无效: {order_price},使用信号价格")
                order_price = signal_price
            
            return order_price, xtconstant.FIX_PRICE
        
        else:
            logging.warning(f"未知信号类型: {signal_type}")
            return signal_price, xtconstant.FIX_PRICE
            
    except Exception as e:
        logging.error(f"计算委托价格异常: {str(e)}")
        return signal_price, xtconstant.FIX_PRICE
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
板凳#
发布于:2025-12-05 19:05
修改执行交易函数
def execute_trade(self, signal):
    """执行交易"""
    try:
        stock_code = signal['code']
        signal_type = signal['type']
        signal_price = signal.get('price', 0)
        condition = signal.get('condition', '')
        
        # 验证信号价格
        if signal_price <= 0:
            logging.error(f"信号价格无效(小于等于0): {signal_price},忽略交易信号")
            return False
        
        self.total_signals += 1
        
        # 计算下单数量
        order_volume = self.calculate_order_volume(stock_code, signal_price, signal_type)
        
        # 计算委托价格和价格类型,传入股票代码以便检查涨跌停价
        order_price, price_type = self.calculate_order_price(signal_price, signal_type, stock_code)
        
        # 对限价单进行价格验证
        if price_type == xtconstant.FIX_PRICE:
            if order_price <= 0:
                logging.error(f"限价单委托价格无效: {order_price:.2f},无法执行交易")
                return False
            
            # 获取涨跌停价进行最终验证
            limit_prices = self.get_stock_limit_prices(stock_code)
            
            if signal_type == "买入":
                if order_price > limit_prices['upper']:
                    logging.error(f"买入价格{order_price:.2f}超过涨停价{limit_prices['upper']:.2f},取消交易")
                    return False
            elif signal_type == "卖出":
                if order_price < limit_prices['lower']:
                    logging.error(f"卖出价格{order_price:.2f}低于跌停价{limit_prices['lower']:.2f},取消交易")
                    return False
        
        price_type_text = "市价" if price_type == xtconstant.LATEST_PRICE else f"限价{order_price:.2f}"
        logging.info(f"执行交易信号: {signal['name']}({stock_code}) {signal_type} @ {signal_price:.2f} → {price_type_text} 数量: {order_volume}")
        
        # 风险控制
        if not self.check_risk_control(stock_code, signal_type, order_volume, signal_price):
            limit_pass, limit_msg = self.check_trade_limits(signal_type)
            if not limit_pass:
                self.rejected_by_limit += 1
                logging.warning(f"因交易限制被拒绝: {limit_msg}")
            return False
        
        # 确定交易方向
        if signal_type == "买入":
            # 买入信号
            if price_type == xtconstant.LATEST_PRICE:
                # 市价单 - 价格参数必须为0
                logging.info(f"执行市价买入: {stock_code} 数量: {order_volume}")
                order_id = self.trader.order_stock(
                    self.account, stock_code, xtconstant.STOCK_BUY,
                    order_volume, price_type, 0,  # 市价单价格参数必须为0
                    'TDXMonitor', f"通达信{condition}"
                )
            else:
                # 限价单 - 价格参数必须大于0
                logging.info(f"执行限价买入: {stock_code} 价格: {order_price:.2f} 数量: {order_volume}")
                order_id = self.trader.order_stock(
                    self.account, stock_code, xtconstant.STOCK_BUY,
                    order_volume, price_type, order_price,
                    'TDXMonitor', f"通达信{condition}"
                )
            
            if order_id:
                # 更新持仓记录
                actual_price = order_price if order_price > 0 else signal_price
                self.update_position_record(stock_code, actual_price, order_volume)
                
        elif signal_type == "卖出":
            # 卖出信号
            positions = self.trader.query_stock_positions(self.account)
            current_volume = 0
            for pos in positions:
                if pos.stock_code == stock_code:
                    current_volume = pos.volume
                    break
            
            sell_volume = min(order_volume, current_volume)
            if sell_volume <= 0:
                logging.warning(f"无持仓可卖: {stock_code}")
                return False
            
            if price_type == xtconstant.LATEST_PRICE:
                # 市价单 - 价格参数必须为0
                logging.info(f"执行市价卖出: {stock_code} 数量: {sell_volume}")
                order_id = self.trader.order_stock(
                    self.account, stock_code, xtconstant.STOCK_SELL,
                    sell_volume, price_type, 0,  # 市价单价格参数必须为0
                    'TDXMonitor', f"通达信{condition}"
                )
            else:
                # 限价单 - 价格参数必须大于0
                logging.info(f"执行限价卖出: {stock_code} 价格: {order_price:.2f} 数量: {sell_volume}")
                order_id = self.trader.order_stock(
                    self.account, stock_code, xtconstant.STOCK_SELL,
                    sell_volume, price_type, order_price,
                    'TDXMonitor', f"通达信{condition}"
                )
            
            if order_id:
                # 移除持仓记录
                self.remove_position_record(stock_code, sell_volume)
        else:
            logging.warning(f"未知信号类型: {signal_type}")
            return False
        
        if order_id:
            logging.info(f"下单成功: {stock_code} | 订单ID: {order_id} | 数量: {order_volume}")
            # 更新交易次数计数
            self.update_trade_counts(signal_type)
            
            self.processed_signals.add(signal['raw_line'])
            self.executed_trades += 1
            if len(self.processed_signals) > 1000:
                self.processed_signals = set(list(self.processed_signals)[-500:])
            return True
        else:
            logging.error("下单失败,订单ID为空")
            return False
            
    except Exception as e:
        logging.error(f"交易执行异常: {str(e)}")
        import traceback
        logging.error(f"详细错误信息: {traceback.format_exc()}")
        return False
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
地板#
发布于:2025-12-05 19:06
修改止损止盈执行价格
def execute_stop_loss_take_profit(self, stock_code, reason, message):
    """执行止损止盈"""
    try:
        if stock_code not in self.position_records:
            logging.warning(f"尝试执行{reason}但无持仓记录: {stock_code}")
            return False
        
        position = self.position_records[stock_code]
        sell_volume = position['volume']
        
        if sell_volume <= 0:
            logging.warning(f"{stock_code} 持仓数量为0,无法执行{reason}")
            return False
        
        # 获取当前价格
        current_price = self.get_current_price_from_qmt(stock_code)
        
        if current_price <= 0:
            # 如果无法获取当前价格,使用成本价
            avg_price = position['avg_price']
            if avg_price <= 0:
                logging.error(f"无法确定{stock_code}的卖出价格")
                return False
            current_price = avg_price
        
        # 计算卖出价格:当前价下浮1%,但不低于跌停价
        limit_prices = self.get_stock_limit_prices(stock_code)
        
        if self.price_strategy['use_market_price']:
            # 市价单
            order_price = 0
            price_type = xtconstant.LATEST_PRICE
            price_desc = "市价"
        else:
            # 限价单 - 当前价下浮1%
            base_price = current_price * (1 - self.price_strategy['sell_discount'])
            
            # 确保不低于跌停价
            if base_price < limit_prices['lower']:
                base_price = limit_prices['lower']
                logging.info(f"止损止盈卖出价格调整为跌停价: {base_price:.2f}")
            
            order_price = self.adjust_price_to_tick(base_price)
            price_type = xtconstant.FIX_PRICE
            price_desc = f"限价{order_price:.2f}"
        
        # 验证卖出价格
        if price_type == xtconstant.FIX_PRICE and order_price <= 0:
            logging.error(f"限价单卖出价格无效: {order_price:.2f}")
            return False
        
        logging.info(f"执行{reason}: {stock_code} {message} | 持仓: {sell_volume}股 | 价格: {price_desc}")
        
        # 执行卖出
        order_id = self.trader.order_stock(
            self.account, stock_code, xtconstant.STOCK_SELL,
            sell_volume, price_type, order_price,
            'StopLossTakeProfit', f"{reason}: {message}"
        )
        
        if order_id:
            logging.info(f"{reason}下单成功: {stock_code} | 订单ID: {order_id} | 数量: {sell_volume}")
            
            # 更新统计
            if reason == "止损":
                self.stop_loss_triggered += 1
            elif reason == "止盈":
                self.take_profit_triggered += 1
            
            # 移除持仓记录
            self.remove_position_record(stock_code, sell_volume)
            return True
        else:
            logging.error(f"{reason}下单失败,订单ID为空")
            return False
            
    except Exception as e:
        logging.error(f"执行{reason}异常: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
        return False
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
4楼#
发布于:2025-12-05 19:06
添加价格合理性检查
def validate_order_price(self, stock_code, price, signal_type):
    """验证委托价格的合理性"""
    try:
        if price <= 0:
            return False, "价格必须大于0"
        
        # 获取涨跌停价
        limit_prices = self.get_stock_limit_prices(stock_code)
        
        if signal_type == "买入":
            if price > limit_prices['upper']:
                return False, f"价格{price:.2f}超过涨停价{limit_prices['upper']:.2f}"
            # 检查是否明显低于当前市价(可能是错误的价格)
            current_price = self.get_current_price_from_qmt(stock_code)
            if current_price > 0 and price < current_price * 0.9:  # 低于市价10%
                logging.warning(f"买入价格{price:.2f}明显低于当前价{current_price:.2f}")
        
        elif signal_type == "卖出":
            if price < limit_prices['lower']:
                return False, f"价格{price:.2f}低于跌停价{limit_prices['lower']:.2f}"
            # 检查是否明显高于当前市价
            current_price = self.get_current_price_from_qmt(stock_code)
            if current_price > 0 and price > current_price * 1.1:  # 高于市价10%
                logging.warning(f"卖出价格{price:.2f}明显高于当前价{current_price:.2f}")
        
        return True, "价格合理"
        
    except Exception as e:
        logging.error(f"验证委托价格异常: {str(e)}")
        return False, f"验证异常: {str(e)}"
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
5楼#
发布于:2025-12-05 19:07
修改主函数中的参数设置
# 在main函数的价格策略设置部分,添加涨跌停价检查选项
def main():
    """主函数"""
    layout = [
        # ... 原有布局 ...
        
        [sg.HorizontalSeparator()],
        [sg.Text('价格策略设置:')],
        [sg.Text('买入上浮(%):'), sg.Input("1.0", key="BUY_PREMIUM", size=(10,1))],
        [sg.Text('卖出下浮(%):'), sg.Input("1.0", key="SELL_DISCOUNT", size=(10,1))],
        [sg.Checkbox('使用市价单(不选中则为限价单)', default=False, key="USE_MARKET")],
        [sg.Checkbox('启用涨跌停价限制', default=True, key="ENABLE_PRICE_LIMIT")],
        
        # ... 其他布局 ...
    ]
    
    # ... 原有代码 ...
    
    # 在参数设置部分添加
    monitor_trader.enable_price_limit = values["ENABLE_PRICE_LIMIT"]
    
    # ... 原有代码 ...
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
6楼#
发布于:2025-12-05 19:07
修改涨跌停价获取函数,添加开关
def get_stock_limit_prices(self, stock_code):
    """获取股票的涨停价和跌停价"""
    try:
        # 如果禁用了涨跌停价限制,返回一个非常大的范围
        if not hasattr(self, 'enable_price_limit') or not self.enable_price_limit:
            return {'upper': 999999, 'lower': 0.01}
        
        # ... 原有代码 ...
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
7楼#
发布于:2025-12-05 19:08
策略优势:
更符合实际交易:按照预警价格上下浮动1%进行交易,而不是使用市价单或固定价格

风险控制更好:确保交易价格不会超过涨跌停价,避免无效委托

提高成交概率:买入时在预警价基础上上浮,卖出时下浮,更容易成交

防止价格异常:通过涨跌停价检查,避免因价格异常导致的错误交易
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
游客

返回顶部