|
阅读:47回复:5
止损止盈检查函数
def check_stop_loss_take_profit(self, stock_code, current_price):
"""检查止损止盈""" try: if not self.stop_loss_take_profit['enabled']: return None, "" if stock_code not in self.position_records: return None, "" position = self.position_records[stock_code] avg_price = position['avg_price'] # 检查价格是否有效 if avg_price <= 0: logging.warning(f"持仓{stock_code}的平均成本价为{avg_price:.2f},无法计算止损止盈") return None, "" if current_price <= 0: logging.warning(f"持仓{stock_code}的当前价格为{current_price:.2f},无法计算止损止盈") return None, "" highest_price = position.get('highest_price', avg_price) # 更新最高价 if current_price > highest_price: position['highest_price'] = current_price highest_price = current_price # 计算盈亏比例 profit_ratio = (current_price - avg_price) / avg_price # 检查止损 stop_loss_rate = self.stop_loss_take_profit['stop_loss_rate'] if stop_loss_rate > 0 and profit_ratio <= -stop_loss_rate: return "止损", f"当前亏损{abs(profit_ratio)*100:.1f}%,超过止损阈值{stop_loss_rate*100:.1f}%" # 检查止盈 take_profit_rate = self.stop_loss_take_profit['take_profit_rate'] if take_profit_rate > 0 and profit_ratio >= take_profit_rate: return "止盈", f"当前盈利{profit_ratio*100:.1f}%,达到止盈阈值{take_profit_rate*100:.1f}%" # 检查移动止损 if self.stop_loss_take_profit['trailing_stop_enabled']: trailing_stop_rate = self.stop_loss_take_profit['trailing_stop_rate'] if trailing_stop_rate > 0 and highest_price > 0: drawdown_from_high = (highest_price - current_price) / highest_price if drawdown_from_high >= trailing_stop_rate: return "移动止损", f"从最高点回撤{drawdown_from_high*100:.1f}%,超过移动止损阈值{trailing_stop_rate*100:.1f}%" return None, "" except ZeroDivisionError as e: logging.warning(f"止损止盈计算时除零错误: stock={stock_code}, avg_price={avg_price if 'avg_price' in locals() else 'N/A'}, current_price={current_price}") return None, "" except Exception as e: logging.error(f"检查止损止盈异常: {str(e)}") import traceback logging.error(traceback.format_exc()) return None, "" |
|
|
|
沙发#
发布于:2025-12-05 19:01
改进获取当前价格的函数
def get_current_price_from_qmt(self, stock_code):
"""从QMT获取当前价格""" try: # 尝试从QMT接口获取实时行情 # 注意:需要根据实际QMT接口调整 # 方法1: 使用xtdata获取实时行情 try: from xtquant import xtdata quote = xtdata.get_full_tick([stock_code]) if stock_code in quote and 'lastPrice' in quote[stock_code]: current_price = quote[stock_code]['lastPrice'] if current_price > 0: return current_price except ImportError: pass except Exception as e: logging.warning(f"通过xtdata获取价格失败: {stock_code}, {str(e)}") # 方法2: 查询最近成交价 try: from xtquant.xttype import StockData # 这里需要根据实际QMT接口实现 # 示例代码,需要调整 pass except: pass # 方法3: 使用默认价格(最后一次买入价格) if stock_code in self.position_records: avg_price = self.position_records[stock_code]['avg_price'] if avg_price > 0: return avg_price # 方法4: 从监控文件中获取最新价格 try: if os.path.exists(self.monitor_file_path): with open(self.monitor_file_path, 'r', encoding='gbk') as f: lines = f.readlines() for line in reversed(lines): # 从后往前查找最新价格 if stock_code[:6] in line: # 匹配股票代码 parts = line.split() if len(parts) >= 4: try: price = float(parts[3]) if price > 0: return price except: pass except: pass logging.warning(f"无法获取{stock_code}的当前价格,使用默认值10.0") return 10.0 # 返回一个安全的默认值 except Exception as e: logging.error(f"获取当前价格异常: {stock_code}, {str(e)}") return 10.0 # 返回一个安全的默认值 |
|
|
|
板凳#
发布于:2025-12-05 19:02
改进检查所有持仓的止损止盈函数
def check_all_positions_stop_loss_take_profit(self):
"""检查所有持仓的止损止盈""" try: if not self.stop_loss_take_profit['enabled']: return current_time = datetime.now() # 检查是否需要执行止损止盈检查 if self.last_stop_check_time and \ (current_time - self.last_stop_check_time).seconds < self.stop_loss_take_profit['check_interval']: return self.last_stop_check_time = current_time # 获取当前持仓 positions = self.trader.query_stock_positions(self.account) if not positions: return logging.info(f"开始检查持仓止损止盈,共{len(positions)}个持仓") for pos in positions: stock_code = pos.stock_code # 跳过无效的持仓 if not stock_code or pos.volume <= 0: continue # 获取当前价格 current_price = self.get_current_price_from_qmt(stock_code) if current_price <= 0: logging.warning(f"持仓{stock_code}当前价格无效: {current_price}") continue # 检查止损止盈 action, message = self.check_stop_loss_take_profit(stock_code, current_price) if action: # 执行止损止盈 success = self.execute_stop_loss_take_profit(stock_code, action, message) if success: logging.info(f"{action}执行成功: {stock_code}") else: logging.warning(f"{action}执行失败: {stock_code}") except Exception as e: logging.error(f"检查所有持仓止损止盈异常: {str(e)}") import traceback logging.error(traceback.format_exc()) |
|
|
|
地板#
发布于:2025-12-05 19:02
修复执行止损止盈函数
def execute_stop_loss_take_profit(self, stock_code, reason, message):
"""执行止损止盈""" try: if stock_code not in self.position_records: logging.warning(f"尝试执行{reason}但无持仓记录: {stock_code}") return False position = self.position_records[stock_code] sell_volume = position['volume'] if sell_volume <= 0: logging.warning(f"{stock_code} 持仓数量为0,无法执行{reason}") return False # 获取当前价格 current_price = self.get_current_price_from_qmt(stock_code) # 计算卖出价格 if self.price_strategy['use_market_price']: # 市价单 order_price = 0 price_type = xtconstant.LATEST_PRICE price_desc = "市价" else: # 限价单 if current_price <= 0: # 如果无法获取当前价格,使用成本价 avg_price = position['avg_price'] if avg_price <= 0: logging.error(f"无法确定{stock_code}的卖出价格") return False # 按成本价的95%卖出 order_price = avg_price * 0.95 logging.warning(f"使用成本价95%作为卖出价: {order_price:.2f}") else: # 计算卖出价格(比当前价低1%以确保成交) order_price = current_price * (1 - self.price_strategy['sell_discount']) order_price = self.adjust_price_to_tick(order_price) price_type = xtconstant.FIX_PRICE price_desc = f"限价{order_price:.2f}" # 验证卖出价格 if price_type == xtconstant.FIX_PRICE and order_price <= 0: logging.error(f"限价单卖出价格无效: {order_price:.2f}") return False logging.info(f"执行{reason}: {stock_code} {message} | 持仓: {sell_volume}股 | 价格: {price_desc}") # 执行卖出 order_id = self.trader.order_stock( self.account, stock_code, xtconstant.STOCK_SELL, sell_volume, price_type, order_price, 'StopLossTakeProfit', f"{reason}: {message}" ) if order_id: logging.info(f"{reason}下单成功: {stock_code} | 订单ID: {order_id} | 数量: {sell_volume}") # 更新统计 if reason == "止损": self.stop_loss_triggered += 1 elif reason == "止盈": self.take_profit_triggered += 1 # 移除持仓记录 self.remove_position_record(stock_code, sell_volume) return True else: logging.error(f"{reason}下单失败,订单ID为空") return False except Exception as e: logging.error(f"执行{reason}异常: {str(e)}") import traceback logging.error(traceback.format_exc()) return False |
|
|
|
4楼#
发布于:2025-12-05 19:03
在更新持仓记录时添加验证
def update_position_record(self, stock_code, buy_price, buy_volume):
"""更新持仓记录""" try: # 验证输入参数 if buy_price <= 0: logging.warning(f"买入价格无效: {buy_price:.2f},不更新持仓记录") return if buy_volume <= 0: logging.warning(f"买入数量无效: {buy_volume},不更新持仓记录") return current_time = datetime.now() if stock_code in self.position_records: # 已有持仓,计算平均成本 old_record = self.position_records[stock_code] # 验证旧的记录 if old_record['avg_price'] <= 0: logging.warning(f"旧持仓{stock_code}的平均成本价为{old_record['avg_price']:.2f},重新设置") # 如果旧的平均成本无效,直接使用新的买入价 self.position_records[stock_code] = { 'avg_price': buy_price, 'volume': buy_volume, 'buy_time': current_time, 'last_update': current_time, 'highest_price': buy_price } else: # 正常计算平均成本 old_total_value = old_record['avg_price'] * old_record['volume'] new_total_value = buy_price * buy_volume total_volume = old_record['volume'] + buy_volume # 避免除零错误 if total_volume > 0: avg_price = (old_total_value + new_total_value) / total_volume else: avg_price = buy_price self.position_records[stock_code] = { 'avg_price': avg_price, 'volume': total_volume, 'buy_time': old_record['buy_time'], # 保留最早的买入时间 'last_update': current_time, 'highest_price': max(old_record.get('highest_price', avg_price), buy_price) } logging.info(f"更新持仓记录: {stock_code} 平均成本: {avg_price:.2f}, 持仓: {total_volume}") else: # 新持仓 self.position_records[stock_code] = { 'avg_price': buy_price, 'volume': buy_volume, 'buy_time': current_time, 'last_update': current_time, 'highest_price': buy_price } logging.info(f"新增持仓记录: {stock_code} 成本: {buy_price:.2f}, 持仓: {buy_volume}") except ZeroDivisionError as e: logging.error(f"更新持仓记录时除零错误: stock={stock_code}, buy_price={buy_price}, buy_volume={buy_volume}") except Exception as e: logging.error(f"更新持仓记录异常: {str(e)}") import traceback logging.error(traceback.format_exc()) |
|
|
|
5楼#
发布于:2025-12-05 19:03
在主循环中添加异常处理
def run(self, check_interval=10):
"""运行监控主循环""" logging.info("启动通达信预警监控系统...") if not self.initialize_trader(): logging.error("交易接口初始化失败,无法启动监控") return self.initialize_monitor() self.query_account_status() logging.info("监控系统运行中,按 Ctrl+C 退出...") try: error_count = 0 max_error_count = 10 while True: try: current_time = datetime.now() current_time_only = current_time.time() if self.is_trading_time(): # 检查文件信号 self.check_monitor_file() # 检查所有持仓的止损止盈 self.check_all_positions_stop_loss_take_profit() if not self.last_check_time or (current_time - self.last_check_time).seconds >= 3600: self.query_account_status() self.print_performance_report() self.last_check_time = current_time # 重置错误计数 error_count = 0 else: close_time = dt_time(15, 5) open_time = dt_time(9, 25) if current_time_only >= close_time or current_time_only <= open_time: if self.last_position != 0: self.last_position = 0 self.processed_signals.clear() logging.info("非交易时间,重置文件监控状态") time_module.sleep(check_interval) except Exception as e: error_count += 1 logging.error(f"主循环异常 ({error_count}/{max_error_count}): {str(e)}") import traceback logging.error(traceback.format_exc()) if error_count >= max_error_count: logging.error(f"错误次数超过限制({max_error_count}),停止监控") break time_module.sleep(5) # 出错后等待5秒再继续 except KeyboardInterrupt: logging.info("用户终止监控") self.print_performance_report() except Exception as e: logging.error(f"监控运行异常: {str(e)}") import traceback logging.error(traceback.format_exc()) finally: logging.info("监控系统停止") |
|
|