import os from fastmcp import FastMCP from spider.mail_qq import start as start_mail_qq_spider import asyncio import json import logging logging.basicConfig(level=logging.INFO) # 日志会输出到 stderr,不会污染 stdout logging.info("服务器启动") mcp = FastMCP("spider-server") VNC_SERVER_HOST = "https://10.10.40.19:6080" SHELL_DIR = "/home/dgs/vnc-server" @mcp.tool() async def mail_qq_spider(account: str,display: int) -> str: """qq邮箱爬虫,第一个参数是用户名 Args: account: qq邮箱用户名 display: 显示桌面编号 Returns: str: 爬虫结果 """ data={"display": display} try: result = await asyncio.to_thread(start_mail_qq_spider, account,display) data["status"] = 200 data["result"] = result return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False)@mcp.tool() async def start_xvnc_server(display: int) -> str: """启动XVNC服务 Args: display: VNC 显示编号 """ data={"display": display} try: # 调用外部 start_browser.sh 脚本,传入 index 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start_browser.sh", str("123456"), # VNC 密码 str(5900), # VNC 端口 str(99), # 显示编号 stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{display}" return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_vnc_server(display: int) -> str: """启动VNC服务 Args: display: VNC 显示编号 """ data={"display": display} try: # 调用外部 start_vnc_server.sh 脚本,传入 index 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start_vnc_server.sh", str(display), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{display}" return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def start_spider(spider_code: str,display: str) -> str: """启动爬虫 Args: spider_code: 爬虫代码 display: 显示桌面编号 """ data={"display": display} try: # 调用外部 start_spider.sh 脚本,传入 spider_code 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/start_spider.sh", spider_code, display, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.tool() async def close_vnc_server(display: int) -> str: """关闭 VNC 服务 Args: display: VNC 显示编号 """ data={"display": display} try: # 调用外部 close_vnc_server.sh 脚本,传入 display 作为参数 process = await asyncio.create_subprocess_exec( f"{SHELL_DIR}/close_vnc_server.sh", str(display), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() data["message"] = stdout.decode() data["status"] = 200 if process.returncode == 0 else 500 if process.returncode == 0: return json.dumps(data, ensure_ascii=False) else: return json.dumps(data, ensure_ascii=False) except Exception as e: data["status"] = 500 return json.dumps(data, ensure_ascii=False) @mcp.resource("data://spider_code") def get_spider_code_list() -> str: spider_list = [ {"code": "01","name": "阳光采购爬虫","auth_name":"mail"}, {"code": "02","name": "QQ邮箱爬虫","auth_name":"mail"}, ] return json.dumps(spider_list, ensure_ascii=False) # 运行服务器 if __name__ == "__main__": mcp.run(transport="sse") # mcp.run(transport="stdio")