Files
ragflow_python/api/db/services/antique_service.py

216 lines
8.4 KiB
Python
Raw Normal View History

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import datetime
import peewee
from werkzeug.security import generate_password_hash, check_password_hash
from api.db import UserTenantRole
from api.db.db_models import DB, UserTenant
from api.db.db_models import User, Tenant, MesumAntique
from api.db.services.common_service import CommonService
from api.db.services.brief_service import MesumOverviewService
from api.utils import get_uuid, get_format_time, current_timestamp, datetime_format
from api.db import StatusEnum
import logging,json
class MesumAntiqueService(CommonService):
model = MesumAntique
@classmethod
@DB.connection_context()
def get_by_mesum_id(cls, mesum_id):
# objs = cls.query(mesum_id=mesum_id)
# return objs
"""
根据mesum_id查询记录并按category和sort_order排序
"""
try:
objs = (cls.model
.select()
.where(cls.model.mesum_id == mesum_id)
.order_by(
cls.model.category, # 一级排序按category
cls.model.sort_order.asc() # 二级排序sort_order升序
)
.execute())
return list(objs)
except Exception as e:
print(f"根据mesum_id查询失败: {e}")
return []
@classmethod
@DB.connection_context()
def get_all_categories(cls,mesum_id):
# 查询所有唯一的category
mesum_antique_categories = []
try:
mesum_brief = MesumOverviewService.query(id=mesum_id)
if mesum_brief:
"""
20250922 数据库中category 改为如下形式
[{"name":"第一展厅(新石器-唐、战国、秦汉)","audio":"xxxx","photo":"xxx"},
{"name":"第二展厅 (辽金)","audio":"xxx","photo":""},
{"name":"第三展厅(元)","audio":"xxx","photo":""},
{"name":"第四展厅(明清-至今)","audio":"xxx","photo":""}]
"""
"""
categories_text= mesum_brief[0].category
# 统一替换中文分号为英文分号,并去除末尾分号
if categories_text:
categories_text = categories_text.replace("", ";").rstrip(";")
# 分割并清理空格/空值
mesum_antique_categories = [dynasty.strip() for dynasty in categories_text.split(";") if dynasty.strip()]
"""
mesum_category_json = json.loads(mesum_brief[0].category.strip())
mesum_antique_categories = [item["name"] for item in mesum_category_json]
finally:
pass
categories = [category.category
for category in (
cls.model.select(cls.model.category)
.where(cls.model.mesum_id==mesum_id)
.distinct()
.execute()
)
if category.category
]
# 下面代码是按照博物馆brief定义的目录顺序调整输出的目录顺序以便前端能够按照正确顺序显示
# cyx 20250415
# 创建字典映射元素到索引,提升查询效率
mesum_antique_categories_dict = {value: idx for idx, value in enumerate(mesum_antique_categories)}
# 排序逻辑:通过查字典获取顺序号,不在字典的给极大值(排最后)
categories_sorted = sorted(
categories,
key=lambda item: mesum_antique_categories_dict.get(item, len(mesum_antique_categories)) # 查字典获取顺序号
)
return categories_sorted
@classmethod
@DB.connection_context()
def get_all_labels(cls):
# 查询所有去重后的label
labels = [label.label for label in cls.model.select(cls.model.label).distinct().execute() if label.label]
return labels
@classmethod
@DB.connection_context()
def get_labels_ext(cls, mesum_id,tags = None):
# 根据mesum_id过滤并排除空的category
"""query = cls.model.select().where(
(cls.model.mesum_id == mesum_id) &
(cls.model.category != "")
).order_by(
cls.model.category, # 一级排序按category
cls.model.sort_order.asc() # 二级排序sort_order升序
)
"""
# 基础查询条件
conditions = [
(cls.model.mesum_id == mesum_id),
(cls.model.category != ""),
(cls.model.category.is_null(False)) # 确保category不为NULL
]
# 添加tags条件
if tags is not None:
# 如果tags是字符串按逗号分割处理
if isinstance(tags, str):
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
elif isinstance(tags, list):
tags_list = tags
else:
tags_list = []
if tags_list:
# 构建tags匹配条件tags包含任意一个指定的标签
tag_conditions = []
for tag in tags_list:
# 匹配tags字段包含该标签考虑逗号分隔的情况
tag_conditions.append(
(cls.model.tags.contains(tag)) |
(cls.model.tags.startswith(f"{tag},")) |
(cls.model.tags.endswith(f",{tag}")) |
(cls.model.tags == tag)
)
# 将所有tag条件用OR连接
tags_condition = reduce(lambda a, b: a | b, tag_conditions)
conditions.append(tags_condition)
else:
# 如果tags为空但传入了参数确保tags不为空
conditions.append(
(cls.model.tags != "") &
(cls.model.tags.is_null(False))
)
else:
# 如果未传入tags参数包含所有tags记录包括空值
pass
# 执行查询,添加二级排序
query = cls.model.select().where(*conditions).order_by(
cls.model.category, # 一级排序按category
cls.model.sort_order.asc() # 二级排序按sort_order升序
)
# 按category分组并存储结果
grouped_data = {}
for obj in query.dicts().execute():
category = obj['category']
if category not in grouped_data:
grouped_data[category] = []
grouped_data[category].append({
'id': obj['id'],
'label': obj['label']
})
return grouped_data
@classmethod
@DB.connection_context()
def get_labels_with_id(cls, mesum_id):
# 根据mesum_id过滤并排除空的category
query = cls.model.select().where(
(cls.model.mesum_id == mesum_id)
).order_by(cls.model.id)
# 将label和关联的id 一并返回
labels_data = []
for obj in query.dicts():
labels_data.append({
'id': obj['id'],
'label': obj['label'],
"category": obj['category'],
})
return labels_data
@classmethod
@DB.connection_context()
def get_antique_by_id(cls, mesum_id,antique_id):
query = cls.model.select().where(
(cls.model.mesum_id == mesum_id) &
(cls.model.id == antique_id)
)
data = []
for obj in query.dicts().execute():
data.append(obj)
if len(data) > 0:
data = data[0]
return data