learn-yolo/19.人脸识别-保存数据库-mysql.py

278 lines
9.1 KiB
Python

# 19.人脸识别-保存数据库-mysql.py
from insightface.app import FaceAnalysis
import cv2
import numpy as np
import pymysql
import os
from numpy.linalg import norm
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # 输出到控制台
logging.FileHandler('face_recognition.log', encoding='utf-8') # 同时保存到文件
]
)
logger = logging.getLogger(__name__)
# 初始化InsightFace模型
app = FaceAnalysis(name='buffalo_l', root='./')
app.prepare(ctx_id=0, det_size=(640, 640))
class FaceDatabase:
def __init__(self, host='localhost', port=3306, user='root', password='root', database='face_repository'):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.init_database()
# 获取数据库连接
def get_connection(self):
return pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4'
)
# 初始化数据库
def init_database(self):
connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
charset='utf8mb4'
)
# 添加人脸到数据库
def add_face(self, name, embedding):
connection = self.get_connection()
try:
with connection.cursor() as cursor:
# 将numpy数组转换为bytes
embedding_blob = embedding.tobytes()
cursor.execute(
'INSERT INTO faces (name, embedding) VALUES (%s, %s)',
(name, embedding_blob)
)
connection.commit()
logger.info(f"✅ 成功注册: {name}")
return True
except pymysql.IntegrityError:
logger.warning(f"⚠️ 姓名已存在: {name}")
return False
except Exception as e:
logger.error(f"添加人脸失败: {e}")
return False
finally:
connection.close()
# 获取所有人脸数据
def get_all_faces(self):
connection = self.get_connection()
try:
with connection.cursor() as cursor:
cursor.execute('SELECT name, embedding FROM faces')
results = cursor.fetchall()
face_database = {}
for name, embedding_blob in results:
# 将bytes转换回numpy数组
embedding = np.frombuffer(embedding_blob, dtype=np.float32)
face_database[name] = embedding
logger.info(f"📊 从数据库加载了 {len(face_database)} 个人脸特征")
return face_database
except Exception as e:
logger.error(f"获取人脸数据失败: {e}")
return {}
finally:
connection.close()
# 删除人脸
def delete_face(self, face_id):
connection = self.get_connection()
try:
with connection.cursor() as cursor:
cursor.execute('DELETE FROM faces WHERE id = %s', (face_id,))
affected_rows = cursor.rowcount
connection.commit()
if affected_rows > 0:
logger.info(f"🗑️ 已删除: {face_id}")
return True
else:
logger.warning(f"❌ 未找到: {face_id}")
return False
except Exception as e:
logger.error(f"删除人脸失败: {e}")
return False
finally:
connection.close()
# 检查人脸是否存在
def face_exists(self, name):
connection = self.get_connection()
try:
with connection.cursor() as cursor:
cursor.execute('SELECT 1 FROM faces WHERE name = %s', (name,))
exists = cursor.fetchone() is not None
return exists
except Exception as e:
logger.error(f"检查人脸是否存在失败: {e}")
return False
finally:
connection.close()
# 创建全局数据库实例
face_db = FaceDatabase(host='localhost', port=3306, user='root', password='root', database='face_repository')
# 注册人脸
def register_face(image_path, person_name):
# 检查图片是否存在
if not os.path.exists(image_path):
logger.error(f"❌ 图片不存在: {image_path}")
return False
# 读取图片
img = cv2.imread(image_path)
if img is None:
logger.error(f"❌ 无法读取图片: {image_path}")
return False
# 使用InsightFace检测人脸
faces = app.get(img)
if len(faces) == 0:
logger.error(f"❌ 在 {image_path} 中未检测到人脸")
return False
elif len(faces) > 1:
logger.warning(f"⚠️ 在 {image_path} 中检测到 {len(faces)} 张人脸,使用第一张")
# 提取人脸特征
embedding = faces[0].embedding
logger.info(f"📐 提取到特征向量,维度: {embedding.shape}")
# 保存到数据库
return face_db.add_face(person_name, embedding)
# 从文件夹批量注册人脸
def register_faces_from_folder(folder_path):
if not os.path.exists(folder_path):
logger.error(f"❌ 文件夹不存在: {folder_path}")
return
supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff')
registered_count = 0
for filename in os.listdir(folder_path):
if filename.lower().endswith(supported_formats):
# 使用文件名(不含扩展名)作为人名
person_name = os.path.splitext(filename)[0]
image_path = os.path.join(folder_path, filename)
if register_face(image_path, person_name):
registered_count += 1
logger.info(f"🎉 批量注册完成,共注册 {registered_count} 个人脸")
# 识别人脸
def recognize_face(image_path, threshold=0.6):
# 检查图片是否存在
if not os.path.exists(image_path):
logger.error(f"❌ 图片不存在: {image_path}")
return
# 读取图片
img = cv2.imread(image_path)
if img is None:
logger.error(f"❌ 无法读取图片: {image_path}")
return
# 从数据库加载所有人脸特征
face_database = face_db.get_all_faces()
if not face_database:
logger.error("❌ 数据库中无人脸数据,请先注册人脸")
return
# 使用InsightFace检测人脸
faces = app.get(img)
if len(faces) == 0:
logger.error("❌ 未检测到人脸")
return
logger.info(f"🔍 检测到 {len(faces)} 张人脸,开始识别...")
for i, face in enumerate(faces):
unknown_embedding = face.embedding
max_sim = -1
identity = "未知"
best_match_name = None
# 与数据库中的每个人脸进行比对
for name, known_embedding in face_database.items():
# 计算余弦相似度
cos_sim = unknown_embedding @ known_embedding.T / (norm(unknown_embedding) * norm(known_embedding))
if cos_sim > max_sim:
max_sim = cos_sim
identity = name
best_match_name = name
# 根据阈值决定最终身份
if max_sim < threshold:
identity = "未知"
# 输出结果
status = "" if identity != "未知" else ""
logger.info(f"{status} 人脸 {i + 1}: 识别为 【{identity}】, 相似度: {max_sim:.4f}")
# 显示匹配详情
if identity != "未知":
matched_embedding = face_database[best_match_name]
logger.info(f" 匹配特征: {best_match_name}, 范数: {norm(matched_embedding):.4f}")
# 列出所有已注册的人脸
def list_registered_faces():
face_database = face_db.get_all_faces()
if not face_database:
logger.info("📭 数据库为空")
return
logger.info(f"\n📋 已注册的人脸 ({len(face_database)} 个):")
for i, name in enumerate(face_database.keys(), 1):
logger.info(f" {i}. {name}")
# 删除指定人脸
def delete_face(face_id):
return face_db.delete_face(face_id)
# 使用示例
if __name__ == "__main__":
logger.info("=" * 50)
logger.info("🎭 InsightFace 人脸识别系统")
logger.info("=" * 50)
# # 1. 批量注册人脸
# logger.info("\n1.批量注册人脸")
# register_faces_from_folder('./resources/face')
#
# # 2. 单个注册
# logger.info("\n2.单个注册")
# register_face('./resources/face/01.png', '张三')
# register_face('./resources/face/02.png', '李四')
#
# # 3. 查看已注册的人脸
# logger.info("\n3.已注册人脸列表")
# list_registered_faces()
# 4. 识别人脸
logger.info("\n4.人脸识别测试")
recognize_face('./resources/face/16.jpg', threshold=0.6)
# 5. 删除测试
# logger.info("\n5. 🗑️ 删除人脸测试")
# delete_face('测试删除')
logger.info("\n" + "=" * 50)
logger.info("程序执行完成")