learn-spider/app.py

102 lines
2.6 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastmcp import FastMCP
from spider.mail_qq import start as a1
mcp = FastMCP("spider-server")
import logging
logging.basicConfig(level=logging.INFO)
# 日志会输出到 stderr不会污染 stdout
logging.info("服务器启动")
@mcp.tool()
def mail_qq_spider(account: str) -> str:
"""qq邮箱爬虫,第一个参数是用户名"""
return a1(account)
# 添加一个简单的工具
@mcp.tool()
def hello(name: str) -> str:
"""Say hello to someone"""
return f"Hello, {name}!"
# 运行服务器
if __name__ == "__main__":
mcp.run(transport="sse")
# mcp.run(transport="stdio")
from fastmcp import FastMCP
import subprocess
import asyncio
import tempfile
import os
mcp = FastMCP("VNC Spider Manager")
@mcp.tool()
async def run_spider_in_vnc(index: int, username: str = None) -> str:
"""在 VNC 会话中运行爬虫
Args:
index: VNC 显示编号
username: 用户名(默认 user{index}
"""
if username is None:
username = f"user{index}"
try:
# 创建临时脚本
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
f.write(f"""#!/bin/bash
PUBLIC_DIR="/shared"
username="{username}"
export DISPLAY=:{index}
echo "启动 VNC 服务"
sudo su - "$username" -c "vncserver :{index} -geometry 1280x800 -depth 24 -localhost no"
sudo su - "$username" -c "export DISPLAY=:{index} && xhost +" 2>/dev/null
sleep 3
cd $PUBLIC_DIR/learn-spider
sudo -E ./run.sh
sudo su - "$username" -c "vncserver -kill :{index}" 2>/dev/null
""")
script_path = f.name
# 执行脚本
os.chmod(script_path, 0o755)
process = await asyncio.create_subprocess_exec(
script_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# 清理临时文件
os.unlink(script_path)
if process.returncode == 0:
return f"✅ 爬虫执行成功\n输出:\n{stdout.decode()}"
else:
return f"❌ 爬虫执行失败\n错误:\n{stderr.decode()}"
except Exception as e:
return f"❌ 执行出错: {str(e)}"
@mcp.tool()
async def stop_vnc_server(index: int, username: str = None) -> str:
"""停止 VNC 服务器"""
if username is None:
username = f"user{index}"
result = subprocess.run(
f'sudo su - {username} -c "vncserver -kill :{index}"',
shell=True,
capture_output=True,
text=True
)
return f"VNC 服务器已停止: {result.stdout}"