import os from fastmcp import FastMCP from spider.mail_qq import start as start_mail_qq_spider import asyncio import json import logging logging.basicConfig(level=logging.INFO) # 日志会输出到 stderr,不会污染 stdout logging.info("服务器启动") mcp = FastMCP("spider-server") VNC_SERVER_HOST = "https://10.10.40.19:6080" SHELL_DIR = "/home/dgs/vnc-server" @mcp.tool() async def mail_qq_spider(account: str,display: int) -> str: """qq邮箱爬虫,第一个参数是用户名 Args: account: qq邮箱用户名 display: 显示桌面编号 Returns: str: 爬虫结果 """ data={"display": display} try: result = await asyncio.to_thread(start_mail_qq_spider, account,display) data["status"] = 200 data["result"] = result return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_xvnc_server(instance: int = 1, password: str = "123456") -> str: """启动独立的 VNC + Chrome 实例 Args: instance: 实例编号 (1-10),会自动计算显示编号和端口 实例1: display=99, VNC端口=5900 实例2: display=100, VNC端口=5901 以此类推 password: VNC 连接密码,默认 123456 Returns: 包含实例信息的 JSON 字符串 """ data = { "instance": instance, "status": 200, "action": "start" } try: # 调用 browser-vnc.sh 脚本 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", "start", str(instance), password, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() if process.returncode == 0: # 解析输出获取信息 public_ip = subprocess.getoutput("hostname -I | awk '{print $1}'") vnc_port = 5900 + (instance - 1) display_num = 98 + instance data["message"] = output data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{instance}" data["vnc_port"] = vnc_port data["display"] = display_num data["password"] = password data["public_ip"] = public_ip.strip() else: data["status"] = 500 data["message"] = stderr.decode() or output or "Unknown error" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def stop_xvnc_server(instance: int = 1, keep_data: bool = False) -> str: """停止指定的 VNC 实例 Args: instance: 实例编号 (1-10) keep_data: 是否保留用户数据,默认 False(删除数据) Returns: 操作结果 """ data = { "instance": instance, "status": 200, "action": "stop", "keep_data": keep_data } try: # 选择命令:stop 或 stop-keep-data cmd = "stop-keep-data" if keep_data else "stop" process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", cmd, str(instance), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() if process.returncode == 0: data["message"] = output else: data["status"] = 500 data["message"] = stderr.decode() or output or "Unknown error" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def stop_all_xvnc_servers(keep_data: bool = False) -> str: """停止所有 VNC 实例 Args: keep_data: 是否保留用户数据,默认 False(删除所有数据) Returns: 操作结果 """ data = { "status": 200, "action": "stop_all", "keep_data": keep_data } try: # 选择命令:stop-all 或 stop-all-keep-data cmd = "stop-all-keep-data" if keep_data else "stop-all" process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() if process.returncode == 0: data["message"] = output else: data["status"] = 500 data["message"] = stderr.decode() or output or "Unknown error" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def list_xvnc_servers() -> str: """列出所有运行中的 VNC 实例""" data = { "status": 200, "action": "list", "instances": [] } try: process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", "status", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() # 解析输出,提取运行中的实例 for line in output.split('\n'): if "✅ Running" in line: # 提取实例编号,例如 "Instance 1: ✅ Running" import re match = re.search(r'Instance (\d+):', line) if match: instance = int(match.group(1)) data["instances"].append({ "instance": instance, "vnc_port": 5900 + (instance - 1), "display": 98 + instance, "vnc_url": f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{instance}" }) data["count"] = len(data["instances"]) data["raw_output"] = output return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def get_vnc_connection_info(instance: int = 1) -> str: """获取指定实例的连接信息 Args: instance: 实例编号 (1-10) Returns: 包含连接信息的 JSON 字符串 """ data = { "instance": instance, "status": 200 } try: # 检查实例是否在运行 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", "status", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() output = stdout.decode() # 检查实例状态 instance_pattern = f"Instance {instance}:" is_running = False for line in output.split('\n'): if instance_pattern in line: if "✅ Running" in line: is_running = True break public_ip = subprocess.getoutput("hostname -I | awk '{print $1}'").strip() vnc_port = 5900 + (instance - 1) display_num = 98 + instance data["is_running"] = is_running data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{instance}" data["vnc_port"] = vnc_port data["display"] = display_num data["public_ip"] = public_ip if not is_running: data["warning"] = "Instance is not running, start it with start_xvnc_server" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def cleanup_xvnc_data(instance: Optional[int] = None) -> str: """清理 VNC 实例的用户数据 Args: instance: 实例编号,如果为 None 则清理所有实例的数据 Returns: 操作结果 """ data = { "status": 200, "action": "cleanup" } try: if instance is not None: # 清理指定实例 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", "cleanup", str(instance), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) data["instance"] = instance else: # 清理所有实例 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", "cleanup-all", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() if process.returncode == 0: data["message"] = output else: data["status"] = 500 data["message"] = stderr.decode() or output or "Unknown error" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_vnc_server(display: int) -> str: """启动VNC服务 Args: display: VNC 显示编号 """ data={"display": display} try: # 调用外部 start_vnc_server.sh 脚本,传入 index 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start_vnc_server.sh", str(display), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{display}" return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_spider(spider_code: str,display: str) -> str: """启动爬虫 Args: spider_code: 爬虫代码 display: 显示桌面编号 """ data={"display": display} try: # 调用外部 start_spider.sh 脚本,传入 spider_code 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start_spider.sh", spider_code, display, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def close_vnc_server(display: int) -> str: """关闭 VNC 服务 Args: display: VNC 显示编号 """ data={"display": display} try: # 调用外部 close_vnc_server.sh 脚本,传入 display 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/close_vnc_server.sh", str(display), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.resource("data://spider_code") def get_spider_code_list() -> str: spider_list = [ {"code": "01","name": "阳光采购爬虫","auth_name":"mail"}, {"code": "02","name": "QQ邮箱爬虫","auth_name":"mail"}, ] return json.dumps(spider_list, ensure_ascii=False) # 运行服务器 if __name__ == "__main__": mcp.run(transport="sse") # mcp.run(transport="stdio")