# # Copyright 2024 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from datetime import datetime import peewee from werkzeug.security import generate_password_hash, check_password_hash from api.db import UserTenantRole from api.db.db_models import DB, UserTenant from api.db.db_models import User, Tenant, MesumAntique from api.db.services.common_service import CommonService from api.db.services.brief_service import MesumOverviewService from api.utils import get_uuid, get_format_time, current_timestamp, datetime_format from api.db import StatusEnum import logging,json class MesumAntiqueService(CommonService): model = MesumAntique @classmethod @DB.connection_context() def get_by_mesum_id(cls, mesum_id): # objs = cls.query(mesum_id=mesum_id) # return objs """ 根据mesum_id查询记录,并按category和sort_order排序 """ try: objs = (cls.model .select() .where(cls.model.mesum_id == mesum_id) .order_by( cls.model.category, # 一级排序:按category cls.model.sort_order.asc() # 二级排序:sort_order升序 ) .execute()) return list(objs) except Exception as e: print(f"根据mesum_id查询失败: {e}") return [] @classmethod @DB.connection_context() def get_all_categories(cls,mesum_id): # 查询所有唯一的category mesum_antique_categories = [] try: mesum_brief = MesumOverviewService.query(id=mesum_id) if mesum_brief: """ 20250922 数据库中category 改为如下形式 [{"name":"第一展厅(新石器-唐、战国、秦汉)","audio":"xxxx","photo":"xxx"}, {"name":"第二展厅 (辽金)","audio":"xxx","photo":""}, {"name":"第三展厅(元)","audio":"xxx","photo":""}, {"name":"第四展厅(明清-至今)","audio":"xxx","photo":""}] """ """ categories_text= mesum_brief[0].category # 统一替换中文分号为英文分号,并去除末尾分号 if categories_text: categories_text = categories_text.replace(";", ";").rstrip(";") # 分割并清理空格/空值 mesum_antique_categories = [dynasty.strip() for dynasty in categories_text.split(";") if dynasty.strip()] """ mesum_category_json = json.loads(mesum_brief[0].category.strip()) mesum_antique_categories = [item["name"] for item in mesum_category_json] finally: pass categories = [category.category for category in ( cls.model.select(cls.model.category) .where(cls.model.mesum_id==mesum_id) .distinct() .execute() ) if category.category ] # 下面代码是按照博物馆brief定义的目录顺序调整输出的目录顺序,以便前端能够按照正确顺序显示 # cyx 20250415 # 创建字典映射元素到索引,提升查询效率 mesum_antique_categories_dict = {value: idx for idx, value in enumerate(mesum_antique_categories)} # 排序逻辑:通过查字典获取顺序号,不在字典的给极大值(排最后) categories_sorted = sorted( categories, key=lambda item: mesum_antique_categories_dict.get(item, len(mesum_antique_categories)) # 查字典获取顺序号 ) return categories_sorted @classmethod @DB.connection_context() def get_all_labels(cls): # 查询所有去重后的label labels = [label.label for label in cls.model.select(cls.model.label).distinct().execute() if label.label] return labels @classmethod @DB.connection_context() def get_labels_ext(cls, mesum_id,tags = None): # 根据mesum_id过滤,并排除空的category """query = cls.model.select().where( (cls.model.mesum_id == mesum_id) & (cls.model.category != "") ).order_by( cls.model.category, # 一级排序:按category cls.model.sort_order.asc() # 二级排序:sort_order升序 ) """ # 基础查询条件 conditions = [ (cls.model.mesum_id == mesum_id), (cls.model.category != ""), (cls.model.category.is_null(False)) # 确保category不为NULL ] # 添加tags条件 if tags is not None: # 如果tags是字符串,按逗号分割处理 if isinstance(tags, str): tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()] elif isinstance(tags, list): tags_list = tags else: tags_list = [] if tags_list: # 构建tags匹配条件:tags包含任意一个指定的标签 tag_conditions = [] for tag in tags_list: # 匹配tags字段包含该标签(考虑逗号分隔的情况) tag_conditions.append( (cls.model.tags.contains(tag)) | (cls.model.tags.startswith(f"{tag},")) | (cls.model.tags.endswith(f",{tag}")) | (cls.model.tags == tag) ) # 将所有tag条件用OR连接 tags_condition = reduce(lambda a, b: a | b, tag_conditions) conditions.append(tags_condition) else: # 如果tags为空但传入了参数,确保tags不为空 conditions.append( (cls.model.tags != "") & (cls.model.tags.is_null(False)) ) else: # 如果未传入tags参数,包含所有tags记录(包括空值) pass # 执行查询,添加二级排序 query = cls.model.select().where(*conditions).order_by( cls.model.category, # 一级排序:按category cls.model.sort_order.asc() # 二级排序:按sort_order升序 ) # 按category分组并存储结果 grouped_data = {} for obj in query.dicts().execute(): category = obj['category'] if category not in grouped_data: grouped_data[category] = [] grouped_data[category].append({ 'id': obj['id'], 'label': obj['label'] }) return grouped_data @classmethod @DB.connection_context() def get_labels_with_id(cls, mesum_id): # 根据mesum_id过滤,并排除空的category query = cls.model.select().where( (cls.model.mesum_id == mesum_id) ).order_by(cls.model.id) # 将label和关联的id 一并返回 labels_data = [] for obj in query.dicts(): labels_data.append({ 'id': obj['id'], 'label': obj['label'], "category": obj['category'], }) return labels_data @classmethod @DB.connection_context() def get_antique_by_id(cls, mesum_id,antique_id): query = cls.model.select().where( (cls.model.mesum_id == mesum_id) & (cls.model.id == antique_id) ) data = [] for obj in query.dicts().execute(): data.append(obj) if len(data) > 0: data = data[0] return data