Spaces:
Paused
Paused
| import json | |
| import locale | |
| import os | |
| from pathlib import Path | |
| import threading | |
| from typing import Any | |
| from uuid import uuid4 | |
| import urllib3 | |
| from loguru import logger | |
| from app.models import const | |
| urllib3.disable_warnings() | |
| def get_response(status: int, data: Any = None, message: str = ""): | |
| obj = { | |
| "status": status, | |
| } | |
| if data: | |
| obj["data"] = data | |
| if message: | |
| obj["message"] = message | |
| return obj | |
| def to_json(obj): | |
| try: | |
| # Define a helper function to handle different types of objects | |
| def serialize(o): | |
| # If the object is a serializable type, return it directly | |
| if isinstance(o, (int, float, bool, str)) or o is None: | |
| return o | |
| # If the object is binary data, convert it to a base64-encoded string | |
| elif isinstance(o, bytes): | |
| return "*** binary data ***" | |
| # If the object is a dictionary, recursively process each key-value pair | |
| elif isinstance(o, dict): | |
| return {k: serialize(v) for k, v in o.items()} | |
| # If the object is a list or tuple, recursively process each element | |
| elif isinstance(o, (list, tuple)): | |
| return [serialize(item) for item in o] | |
| # If the object is a custom type, attempt to return its __dict__ attribute | |
| elif hasattr(o, "__dict__"): | |
| return serialize(o.__dict__) | |
| # Return None for other cases (or choose to raise an exception) | |
| else: | |
| return None | |
| # Use the serialize function to process the input object | |
| serialized_obj = serialize(obj) | |
| # Serialize the processed object into a JSON string | |
| return json.dumps(serialized_obj, ensure_ascii=False, indent=4) | |
| except Exception: | |
| return None | |
| def get_uuid(remove_hyphen: bool = False): | |
| u = str(uuid4()) | |
| if remove_hyphen: | |
| u = u.replace("-", "") | |
| return u | |
| def root_dir(): | |
| return os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) | |
| def storage_dir(sub_dir: str = "", create: bool = False): | |
| d = os.path.join(root_dir(), "storage") | |
| if sub_dir: | |
| d = os.path.join(d, sub_dir) | |
| if create and not os.path.exists(d): | |
| os.makedirs(d) | |
| return d | |
| def resource_dir(sub_dir: str = ""): | |
| d = os.path.join(root_dir(), "resource") | |
| if sub_dir: | |
| d = os.path.join(d, sub_dir) | |
| return d | |
| def task_dir(sub_dir: str = ""): | |
| d = os.path.join(storage_dir(), "tasks") | |
| if sub_dir: | |
| d = os.path.join(d, sub_dir) | |
| if not os.path.exists(d): | |
| os.makedirs(d) | |
| return d | |
| def font_dir(sub_dir: str = ""): | |
| d = resource_dir("fonts") | |
| if sub_dir: | |
| d = os.path.join(d, sub_dir) | |
| if not os.path.exists(d): | |
| os.makedirs(d) | |
| return d | |
| def song_dir(sub_dir: str = ""): | |
| d = resource_dir("songs") | |
| if sub_dir: | |
| d = os.path.join(d, sub_dir) | |
| if not os.path.exists(d): | |
| os.makedirs(d) | |
| return d | |
| def public_dir(sub_dir: str = ""): | |
| d = resource_dir("public") | |
| if sub_dir: | |
| d = os.path.join(d, sub_dir) | |
| if not os.path.exists(d): | |
| os.makedirs(d) | |
| return d | |
| def run_in_background(func, *args, **kwargs): | |
| def run(): | |
| try: | |
| func(*args, **kwargs) | |
| except Exception as e: | |
| logger.error(f"run_in_background error: {e}") | |
| thread = threading.Thread(target=run) | |
| thread.start() | |
| return thread | |
| def time_convert_seconds_to_hmsm(seconds) -> str: | |
| hours = int(seconds // 3600) | |
| seconds = seconds % 3600 | |
| minutes = int(seconds // 60) | |
| milliseconds = int(seconds * 1000) % 1000 | |
| seconds = int(seconds % 60) | |
| return "{:02d}:{:02d}:{:02d},{:03d}".format(hours, minutes, seconds, milliseconds) | |
| def text_to_srt(idx: int, msg: str, start_time: float, end_time: float) -> str: | |
| start_time = time_convert_seconds_to_hmsm(start_time) | |
| end_time = time_convert_seconds_to_hmsm(end_time) | |
| srt = """%d | |
| %s --> %s | |
| %s | |
| """ % ( | |
| idx, | |
| start_time, | |
| end_time, | |
| msg, | |
| ) | |
| return srt | |
| def str_contains_punctuation(word): | |
| for p in const.PUNCTUATIONS: | |
| if p in word: | |
| return True | |
| return False | |
| def split_string_by_punctuations(s): | |
| result = [] | |
| txt = "" | |
| previous_char = "" | |
| next_char = "" | |
| for i in range(len(s)): | |
| char = s[i] | |
| if char == "\n": | |
| result.append(txt.strip()) | |
| txt = "" | |
| continue | |
| if i > 0: | |
| previous_char = s[i - 1] | |
| if i < len(s) - 1: | |
| next_char = s[i + 1] | |
| if char == "." and previous_char.isdigit() and next_char.isdigit(): | |
| # # In the case of "withdraw 10,000, charged at 2.5% fee", the dot in "2.5" should not be treated as a line break marker | |
| txt += char | |
| continue | |
| if char not in const.PUNCTUATIONS: | |
| txt += char | |
| else: | |
| result.append(txt.strip()) | |
| txt = "" | |
| result.append(txt.strip()) | |
| # filter empty string | |
| result = list(filter(None, result)) | |
| return result | |
| def md5(text): | |
| import hashlib | |
| return hashlib.md5(text.encode("utf-8")).hexdigest() | |
| def get_system_locale(): | |
| try: | |
| loc = locale.getdefaultlocale() | |
| # zh_CN, zh_TW return zh | |
| # en_US, en_GB return en | |
| language_code = loc[0].split("_")[0] | |
| return language_code | |
| except Exception: | |
| return "en" | |
| def load_locales(i18n_dir): | |
| _locales = {} | |
| for root, dirs, files in os.walk(i18n_dir): | |
| for file in files: | |
| if file.endswith(".json"): | |
| lang = file.split(".")[0] | |
| with open(os.path.join(root, file), "r", encoding="utf-8") as f: | |
| _locales[lang] = json.loads(f.read()) | |
| return _locales | |
| def parse_extension(filename): | |
| return Path(filename).suffix.lower().lstrip('.') | |