老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
阅读:47回复:5

止损止盈检查函数

楼主#
更多 发布于:2025-12-05 19:00
def check_stop_loss_take_profit(self, stock_code, current_price):
    """检查止损止盈"""
    try:
        if not self.stop_loss_take_profit['enabled']:
            return None, ""
        
        if stock_code not in self.position_records:
            return None, ""
        
        position = self.position_records[stock_code]
        avg_price = position['avg_price']
        
        # 检查价格是否有效
        if avg_price <= 0:
            logging.warning(f"持仓{stock_code}的平均成本价为{avg_price:.2f},无法计算止损止盈")
            return None, ""
        
        if current_price <= 0:
            logging.warning(f"持仓{stock_code}的当前价格为{current_price:.2f},无法计算止损止盈")
            return None, ""
        
        highest_price = position.get('highest_price', avg_price)
        
        # 更新最高价
        if current_price > highest_price:
            position['highest_price'] = current_price
            highest_price = current_price
        
        # 计算盈亏比例
        profit_ratio = (current_price - avg_price) / avg_price
        
        # 检查止损
        stop_loss_rate = self.stop_loss_take_profit['stop_loss_rate']
        if stop_loss_rate > 0 and profit_ratio <= -stop_loss_rate:
            return "止损", f"当前亏损{abs(profit_ratio)*100:.1f}%,超过止损阈值{stop_loss_rate*100:.1f}%"
        
        # 检查止盈
        take_profit_rate = self.stop_loss_take_profit['take_profit_rate']
        if take_profit_rate > 0 and profit_ratio >= take_profit_rate:
            return "止盈", f"当前盈利{profit_ratio*100:.1f}%,达到止盈阈值{take_profit_rate*100:.1f}%"
        
        # 检查移动止损
        if self.stop_loss_take_profit['trailing_stop_enabled']:
            trailing_stop_rate = self.stop_loss_take_profit['trailing_stop_rate']
            if trailing_stop_rate > 0 and highest_price > 0:
                drawdown_from_high = (highest_price - current_price) / highest_price
                if drawdown_from_high >= trailing_stop_rate:
                    return "移动止损", f"从最高点回撤{drawdown_from_high*100:.1f}%,超过移动止损阈值{trailing_stop_rate*100:.1f}%"
        
        return None, ""
        
    except ZeroDivisionError as e:
        logging.warning(f"止损止盈计算时除零错误: stock={stock_code}, avg_price={avg_price if 'avg_price' in locals() else 'N/A'}, current_price={current_price}")
        return None, ""
    except Exception as e:
        logging.error(f"检查止损止盈异常: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
        return None, ""
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
沙发#
发布于:2025-12-05 19:01
改进获取当前价格的函数
def get_current_price_from_qmt(self, stock_code):
    """从QMT获取当前价格"""
    try:
        # 尝试从QMT接口获取实时行情
        # 注意:需要根据实际QMT接口调整
        
        # 方法1: 使用xtdata获取实时行情
        try:
            from xtquant import xtdata
            quote = xtdata.get_full_tick([stock_code])
            if stock_code in quote and 'lastPrice' in quote[stock_code]:
                current_price = quote[stock_code]['lastPrice']
                if current_price > 0:
                    return current_price
        except ImportError:
            pass
        except Exception as e:
            logging.warning(f"通过xtdata获取价格失败: {stock_code}, {str(e)}")
        
        # 方法2: 查询最近成交价
        try:
            from xtquant.xttype import StockData
            # 这里需要根据实际QMT接口实现
            # 示例代码,需要调整
            pass
        except:
            pass
        
        # 方法3: 使用默认价格(最后一次买入价格)
        if stock_code in self.position_records:
            avg_price = self.position_records[stock_code]['avg_price']
            if avg_price > 0:
                return avg_price
        
        # 方法4: 从监控文件中获取最新价格
        try:
            if os.path.exists(self.monitor_file_path):
                with open(self.monitor_file_path, 'r', encoding='gbk') as f:
                    lines = f.readlines()
                    for line in reversed(lines):  # 从后往前查找最新价格
                        if stock_code[:6] in line:  # 匹配股票代码
                            parts = line.split()
                            if len(parts) >= 4:
                                try:
                                    price = float(parts[3])
                                    if price > 0:
                                        return price
                                except:
                                    pass
        except:
            pass
        
        logging.warning(f"无法获取{stock_code}的当前价格,使用默认值10.0")
        return 10.0  # 返回一个安全的默认值
        
    except Exception as e:
        logging.error(f"获取当前价格异常: {stock_code}, {str(e)}")
        return 10.0  # 返回一个安全的默认值
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
板凳#
发布于:2025-12-05 19:02
改进检查所有持仓的止损止盈函数
def check_all_positions_stop_loss_take_profit(self):
    """检查所有持仓的止损止盈"""
    try:
        if not self.stop_loss_take_profit['enabled']:
            return
        
        current_time = datetime.now()
        
        # 检查是否需要执行止损止盈检查
        if self.last_stop_check_time and \
           (current_time - self.last_stop_check_time).seconds < self.stop_loss_take_profit['check_interval']:
            return
        
        self.last_stop_check_time = current_time
        
        # 获取当前持仓
        positions = self.trader.query_stock_positions(self.account)
        if not positions:
            return
        
        logging.info(f"开始检查持仓止损止盈,共{len(positions)}个持仓")
        
        for pos in positions:
            stock_code = pos.stock_code
            
            # 跳过无效的持仓
            if not stock_code or pos.volume <= 0:
                continue
            
            # 获取当前价格
            current_price = self.get_current_price_from_qmt(stock_code)
            
            if current_price <= 0:
                logging.warning(f"持仓{stock_code}当前价格无效: {current_price}")
                continue
            
            # 检查止损止盈
            action, message = self.check_stop_loss_take_profit(stock_code, current_price)
            
            if action:
                # 执行止损止盈
                success = self.execute_stop_loss_take_profit(stock_code, action, message)
                if success:
                    logging.info(f"{action}执行成功: {stock_code}")
                else:
                    logging.warning(f"{action}执行失败: {stock_code}")
                        
    except Exception as e:
        logging.error(f"检查所有持仓止损止盈异常: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
地板#
发布于:2025-12-05 19:02
修复执行止损止盈函数
def execute_stop_loss_take_profit(self, stock_code, reason, message):
    """执行止损止盈"""
    try:
        if stock_code not in self.position_records:
            logging.warning(f"尝试执行{reason}但无持仓记录: {stock_code}")
            return False
        
        position = self.position_records[stock_code]
        sell_volume = position['volume']
        
        if sell_volume <= 0:
            logging.warning(f"{stock_code} 持仓数量为0,无法执行{reason}")
            return False
        
        # 获取当前价格
        current_price = self.get_current_price_from_qmt(stock_code)
        
        # 计算卖出价格
        if self.price_strategy['use_market_price']:
            # 市价单
            order_price = 0
            price_type = xtconstant.LATEST_PRICE
            price_desc = "市价"
        else:
            # 限价单
            if current_price <= 0:
                # 如果无法获取当前价格,使用成本价
                avg_price = position['avg_price']
                if avg_price <= 0:
                    logging.error(f"无法确定{stock_code}的卖出价格")
                    return False
                # 按成本价的95%卖出
                order_price = avg_price * 0.95
                logging.warning(f"使用成本价95%作为卖出价: {order_price:.2f}")
            else:
                # 计算卖出价格(比当前价低1%以确保成交)
                order_price = current_price * (1 - self.price_strategy['sell_discount'])
            
            order_price = self.adjust_price_to_tick(order_price)
            price_type = xtconstant.FIX_PRICE
            price_desc = f"限价{order_price:.2f}"
        
        # 验证卖出价格
        if price_type == xtconstant.FIX_PRICE and order_price <= 0:
            logging.error(f"限价单卖出价格无效: {order_price:.2f}")
            return False
        
        logging.info(f"执行{reason}: {stock_code} {message} | 持仓: {sell_volume}股 | 价格: {price_desc}")
        
        # 执行卖出
        order_id = self.trader.order_stock(
            self.account, stock_code, xtconstant.STOCK_SELL,
            sell_volume, price_type, order_price,
            'StopLossTakeProfit', f"{reason}: {message}"
        )
        
        if order_id:
            logging.info(f"{reason}下单成功: {stock_code} | 订单ID: {order_id} | 数量: {sell_volume}")
            
            # 更新统计
            if reason == "止损":
                self.stop_loss_triggered += 1
            elif reason == "止盈":
                self.take_profit_triggered += 1
            
            # 移除持仓记录
            self.remove_position_record(stock_code, sell_volume)
            return True
        else:
            logging.error(f"{reason}下单失败,订单ID为空")
            return False
            
    except Exception as e:
        logging.error(f"执行{reason}异常: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
        return False
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
4楼#
发布于:2025-12-05 19:03
在更新持仓记录时添加验证
def update_position_record(self, stock_code, buy_price, buy_volume):
    """更新持仓记录"""
    try:
        # 验证输入参数
        if buy_price <= 0:
            logging.warning(f"买入价格无效: {buy_price:.2f},不更新持仓记录")
            return
        
        if buy_volume <= 0:
            logging.warning(f"买入数量无效: {buy_volume},不更新持仓记录")
            return
        
        current_time = datetime.now()
        
        if stock_code in self.position_records:
            # 已有持仓,计算平均成本
            old_record = self.position_records[stock_code]
            
            # 验证旧的记录
            if old_record['avg_price'] <= 0:
                logging.warning(f"旧持仓{stock_code}的平均成本价为{old_record['avg_price']:.2f},重新设置")
                # 如果旧的平均成本无效,直接使用新的买入价
                self.position_records[stock_code] = {
                    'avg_price': buy_price,
                    'volume': buy_volume,
                    'buy_time': current_time,
                    'last_update': current_time,
                    'highest_price': buy_price
                }
            else:
                # 正常计算平均成本
                old_total_value = old_record['avg_price'] * old_record['volume']
                new_total_value = buy_price * buy_volume
                total_volume = old_record['volume'] + buy_volume
                
                # 避免除零错误
                if total_volume > 0:
                    avg_price = (old_total_value + new_total_value) / total_volume
                else:
                    avg_price = buy_price
                
                self.position_records[stock_code] = {
                    'avg_price': avg_price,
                    'volume': total_volume,
                    'buy_time': old_record['buy_time'],  # 保留最早的买入时间
                    'last_update': current_time,
                    'highest_price': max(old_record.get('highest_price', avg_price), buy_price)
                }
                logging.info(f"更新持仓记录: {stock_code} 平均成本: {avg_price:.2f}, 持仓: {total_volume}")
        else:
            # 新持仓
            self.position_records[stock_code] = {
                'avg_price': buy_price,
                'volume': buy_volume,
                'buy_time': current_time,
                'last_update': current_time,
                'highest_price': buy_price
            }
            logging.info(f"新增持仓记录: {stock_code} 成本: {buy_price:.2f}, 持仓: {buy_volume}")
            
    except ZeroDivisionError as e:
        logging.error(f"更新持仓记录时除零错误: stock={stock_code}, buy_price={buy_price}, buy_volume={buy_volume}")
    except Exception as e:
        logging.error(f"更新持仓记录异常: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
老帅李华杰
管理员
管理员
  • UID2
  • 粉丝33
  • 关注0
  • 发帖数433
  • 社区居民
  • 忠实会员
  • 喜欢达人
  • 原创写手
5楼#
发布于:2025-12-05 19:03
在主循环中添加异常处理
def run(self, check_interval=10):
    """运行监控主循环"""
    logging.info("启动通达信预警监控系统...")
    
    if not self.initialize_trader():
        logging.error("交易接口初始化失败,无法启动监控")
        return
        
    self.initialize_monitor()
    self.query_account_status()
    
    logging.info("监控系统运行中,按 Ctrl+C 退出...")
    
    try:
        error_count = 0
        max_error_count = 10
        
        while True:
            try:
                current_time = datetime.now()
                current_time_only = current_time.time()
                
                if self.is_trading_time():
                    # 检查文件信号
                    self.check_monitor_file()
                    
                    # 检查所有持仓的止损止盈
                    self.check_all_positions_stop_loss_take_profit()
                    
                    if not self.last_check_time or (current_time - self.last_check_time).seconds >= 3600:
                        self.query_account_status()
                        self.print_performance_report()
                        self.last_check_time = current_time
                        
                    # 重置错误计数
                    error_count = 0
                else:
                    close_time = dt_time(15, 5)
                    open_time = dt_time(9, 25)
                    
                    if current_time_only >= close_time or current_time_only <= open_time:
                        if self.last_position != 0:
                            self.last_position = 0
                            self.processed_signals.clear()
                            logging.info("非交易时间,重置文件监控状态")
                
                time_module.sleep(check_interval)
                
            except Exception as e:
                error_count += 1
                logging.error(f"主循环异常 ({error_count}/{max_error_count}): {str(e)}")
                import traceback
                logging.error(traceback.format_exc())
                
                if error_count >= max_error_count:
                    logging.error(f"错误次数超过限制({max_error_count}),停止监控")
                    break
                    
                time_module.sleep(5)  # 出错后等待5秒再继续
                
    except KeyboardInterrupt:
        logging.info("用户终止监控")
        self.print_performance_report()
    except Exception as e:
        logging.error(f"监控运行异常: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
    finally:
        logging.info("监控系统停止")
好的指标等于至高的阵地,明察秋毫自然马到成功; 微信手机同号:15907742318 老帅李华杰
游客

返回顶部