import os from fastmcp import FastMCP from spider.mail_qq import start as start_mail_qq_spider import asyncio import json from typing import Optional import subprocess import logging logging.basicConfig(level=logging.INFO) # 日志会输出到 stderr,不会污染 stdout logging.info("服务器启动") mcp = FastMCP("spider-server") VNC_SERVER_HOST = "https://10.10.40.19:6080" # SHELL_DIR = "/home/dgs/vnc-server" SHELL_DIR = "./bin" # @mcp.tool() async def mail_qq_spider(account: str,display: int) -> str: """qq邮箱爬虫,第一个参数是用户名 Args: account: qq邮箱用户名 display: 显示桌面编号 Returns: str: 爬虫结果 """ data={"display": display} try: result = await asyncio.to_thread(start_mail_qq_spider, account,display) data["status"] = 200 data["result"] = result return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_xvnc_server(instance: int = 1, password: str = "123456") -> str: """启动独立的 VNC + Chrome 实例 Args: instance: 实例编号 (1-10),会自动计算显示编号和端口 实例1: display=99, VNC端口=5900 实例2: display=100, VNC端口=5901 以此类推 password: VNC 连接密码,默认 123456 Returns: 包含实例信息的 JSON 字符串 """ data = { "instance": instance, "status": 200, "action": "start" } try: # 调用 browser-vnc.sh 脚本 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start-vnc.sh", "start", str(instance), password, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() if process.returncode == 0: # 解析输出获取信息 public_ip = subprocess.getoutput("hostname -I | awk '{print $1}'") vnc_port = 5900 + (instance - 1) display_num = 98 + instance data["message"] = output data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{instance}" data["vnc_port"] = vnc_port data["display"] = display_num data["password"] = password data["public_ip"] = public_ip.strip() else: data["status"] = 500 data["message"] = stderr.decode() or output or "Unknown error" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def stop_xvnc_server(instance: int = 1, keep_data: bool = False) -> str: """停止指定的 VNC 实例 Args: instance: 实例编号 (1-10) keep_data: 是否保留用户数据,默认 False(删除数据) Returns: 操作结果 """ data = { "instance": instance, "status": 200, "action": "stop", "keep_data": keep_data } try: # 选择命令:stop 或 stop-keep-data cmd = "stop-keep-data" if keep_data else "stop" process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", cmd, str(instance), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() if process.returncode == 0: data["message"] = output else: data["status"] = 500 data["message"] = stderr.decode() or output or "Unknown error" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def cleanup_xvnc_data(instance: Optional[int] = None) -> str: """清理 VNC 实例的用户数据 Args: instance: 实例编号,如果为 None 则清理所有实例的数据 Returns: 操作结果 """ data = { "status": 200, "action": "cleanup" } try: if instance is not None: # 清理指定实例 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", "cleanup", str(instance), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) data["instance"] = instance else: # 清理所有实例 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/browser-vnc.sh", "cleanup-all", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() output = stdout.decode() if process.returncode == 0: data["message"] = output else: data["status"] = 500 data["message"] = stderr.decode() or output or "Unknown error" return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 data["message"] = str(e) return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_vnc_server(display: int) -> str: """启动VNC服务 Args: display: VNC 显示编号 """ data={"display": display} try: # 调用外部 start_vnc_server.sh 脚本,传入 index 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start_vnc_server.sh", str(display), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{display}" return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_spider(spider_code: str,display: str) -> str: """启动爬虫 Args: spider_code: 爬虫代码 display: 显示桌面编号 """ data={"display": display} try: # 调用外部 start_spider.sh 脚本,传入 spider_code 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start_spider.sh", spider_code, display, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def close_vnc_server(display: int) -> str: """关闭 VNC 服务 Args: display: VNC 显示编号 """ data={"display": display} try: # 调用外部 close_vnc_server.sh 脚本,传入 display 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/close_vnc_server.sh", str(display), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.resource("data://spider_code") def get_spider_code_list() -> str: spider_list = [ {"code": "01","name": "阳光采购爬虫","auth_name":"mail"}, {"code": "02","name": "QQ邮箱爬虫","auth_name":"mail"}, ] return json.dumps(spider_list, ensure_ascii=False) # 运行服务器 if __name__ == "__main__": mcp.run(transport="sse") # mcp.run(transport="stdio")