learn-spider/app.py

432 lines
13 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from fastmcp import FastMCP
from spider.mail_qq import start as start_mail_qq_spider
import asyncio
import json
import logging
logging.basicConfig(level=logging.INFO)
# 日志会输出到 stderr不会污染 stdout
logging.info("服务器启动")
mcp = FastMCP("spider-server")
VNC_SERVER_HOST = "https://10.10.40.19:6080"
# SHELL_DIR = "/home/dgs/vnc-server"
SHELL_DIR = "./bin"
@mcp.tool()
async def mail_qq_spider(account: str,display: int) -> str:
"""qq邮箱爬虫,第一个参数是用户名
Args:
account: qq邮箱用户名
display: 显示桌面编号
Returns:
str: 爬虫结果
"""
data={"display": display}
try:
result = await asyncio.to_thread(start_mail_qq_spider, account,display)
data["status"] = 200
data["result"] = result
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def start_xvnc_server(instance: int = 1, password: str = "123456") -> str:
"""启动独立的 VNC + Chrome 实例
Args:
instance: 实例编号 (1-10),会自动计算显示编号和端口
实例1: display=99, VNC端口=5900
实例2: display=100, VNC端口=5901
以此类推
password: VNC 连接密码,默认 123456
Returns:
包含实例信息的 JSON 字符串
"""
data = {
"instance": instance,
"status": 200,
"action": "start"
}
try:
# 调用 browser-vnc.sh 脚本
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/browser-vnc.sh",
"start",
str(instance),
password,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
output = stdout.decode()
if process.returncode == 0:
# 解析输出获取信息
public_ip = subprocess.getoutput("hostname -I | awk '{print $1}'")
vnc_port = 5900 + (instance - 1)
display_num = 98 + instance
data["message"] = output
data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{instance}"
data["vnc_port"] = vnc_port
data["display"] = display_num
data["password"] = password
data["public_ip"] = public_ip.strip()
else:
data["status"] = 500
data["message"] = stderr.decode() or output or "Unknown error"
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
data["message"] = str(e)
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def stop_xvnc_server(instance: int = 1, keep_data: bool = False) -> str:
"""停止指定的 VNC 实例
Args:
instance: 实例编号 (1-10)
keep_data: 是否保留用户数据,默认 False删除数据
Returns:
操作结果
"""
data = {
"instance": instance,
"status": 200,
"action": "stop",
"keep_data": keep_data
}
try:
# 选择命令stop 或 stop-keep-data
cmd = "stop-keep-data" if keep_data else "stop"
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/browser-vnc.sh",
cmd,
str(instance),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
output = stdout.decode()
if process.returncode == 0:
data["message"] = output
else:
data["status"] = 500
data["message"] = stderr.decode() or output or "Unknown error"
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
data["message"] = str(e)
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def stop_all_xvnc_servers(keep_data: bool = False) -> str:
"""停止所有 VNC 实例
Args:
keep_data: 是否保留用户数据,默认 False删除所有数据
Returns:
操作结果
"""
data = {
"status": 200,
"action": "stop_all",
"keep_data": keep_data
}
try:
# 选择命令stop-all 或 stop-all-keep-data
cmd = "stop-all-keep-data" if keep_data else "stop-all"
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/browser-vnc.sh",
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
output = stdout.decode()
if process.returncode == 0:
data["message"] = output
else:
data["status"] = 500
data["message"] = stderr.decode() or output or "Unknown error"
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
data["message"] = str(e)
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def list_xvnc_servers() -> str:
"""列出所有运行中的 VNC 实例"""
data = {
"status": 200,
"action": "list",
"instances": []
}
try:
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/browser-vnc.sh",
"status",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
output = stdout.decode()
# 解析输出,提取运行中的实例
for line in output.split('\n'):
if "✅ Running" in line:
# 提取实例编号,例如 "Instance 1: ✅ Running"
import re
match = re.search(r'Instance (\d+):', line)
if match:
instance = int(match.group(1))
data["instances"].append({
"instance": instance,
"vnc_port": 5900 + (instance - 1),
"display": 98 + instance,
"vnc_url": f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{instance}"
})
data["count"] = len(data["instances"])
data["raw_output"] = output
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
data["message"] = str(e)
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def get_vnc_connection_info(instance: int = 1) -> str:
"""获取指定实例的连接信息
Args:
instance: 实例编号 (1-10)
Returns:
包含连接信息的 JSON 字符串
"""
data = {
"instance": instance,
"status": 200
}
try:
# 检查实例是否在运行
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/browser-vnc.sh",
"status",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
output = stdout.decode()
# 检查实例状态
instance_pattern = f"Instance {instance}:"
is_running = False
for line in output.split('\n'):
if instance_pattern in line:
if "✅ Running" in line:
is_running = True
break
public_ip = subprocess.getoutput("hostname -I | awk '{print $1}'").strip()
vnc_port = 5900 + (instance - 1)
display_num = 98 + instance
data["is_running"] = is_running
data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{instance}"
data["vnc_port"] = vnc_port
data["display"] = display_num
data["public_ip"] = public_ip
if not is_running:
data["warning"] = "Instance is not running, start it with start_xvnc_server"
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
data["message"] = str(e)
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def cleanup_xvnc_data(instance: Optional[int] = None) -> str:
"""清理 VNC 实例的用户数据
Args:
instance: 实例编号,如果为 None 则清理所有实例的数据
Returns:
操作结果
"""
data = {
"status": 200,
"action": "cleanup"
}
try:
if instance is not None:
# 清理指定实例
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/browser-vnc.sh",
"cleanup",
str(instance),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
data["instance"] = instance
else:
# 清理所有实例
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/browser-vnc.sh",
"cleanup-all",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
output = stdout.decode()
if process.returncode == 0:
data["message"] = output
else:
data["status"] = 500
data["message"] = stderr.decode() or output or "Unknown error"
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
data["message"] = str(e)
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def start_vnc_server(display: int) -> str:
"""启动VNC服务
Args:
display: VNC 显示编号
"""
data={"display": display}
try:
# 调用外部 start_vnc_server.sh 脚本,传入 index 作为参数
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/start_vnc_server.sh",
str(display),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
data["message"] = stdout.decode()
data["status"] = 200 if process.returncode == 0 else 500
if process.returncode == 0:
data["vnc_url"] = f"{VNC_SERVER_HOST}/vnc_lite.html?path=?token=user{display}"
return json.dumps(data, ensure_ascii=False)
else:
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def start_spider(spider_code: str,display: str) -> str:
"""启动爬虫
Args:
spider_code: 爬虫代码
display: 显示桌面编号
"""
data={"display": display}
try:
# 调用外部 start_spider.sh 脚本,传入 spider_code 作为参数
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/start_spider.sh",
spider_code,
display,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
data["message"] = stdout.decode()
data["status"] = 200 if process.returncode == 0 else 500
if process.returncode == 0:
return json.dumps(data, ensure_ascii=False)
else:
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
return json.dumps(data, ensure_ascii=False)
@mcp.tool()
async def close_vnc_server(display: int) -> str:
"""关闭 VNC 服务
Args:
display: VNC 显示编号
"""
data={"display": display}
try:
# 调用外部 close_vnc_server.sh 脚本,传入 display 作为参数
process = await asyncio.create_subprocess_exec(
f"{SHELL_DIR}/close_vnc_server.sh",
str(display),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
data["message"] = stdout.decode()
data["status"] = 200 if process.returncode == 0 else 500
if process.returncode == 0:
return json.dumps(data, ensure_ascii=False)
else:
return json.dumps(data, ensure_ascii=False)
except Exception as e:
data["status"] = 500
return json.dumps(data, ensure_ascii=False)
@mcp.resource("data://spider_code")
def get_spider_code_list() -> str:
spider_list = [
{"code": "01","name": "阳光采购爬虫","auth_name":"mail"},
{"code": "02","name": "QQ邮箱爬虫","auth_name":"mail"},
]
return json.dumps(spider_list, ensure_ascii=False)
# 运行服务器
if __name__ == "__main__":
mcp.run(transport="sse")
# mcp.run(transport="stdio")