File size: 6,617 Bytes
b565092 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
def CutNICK(update_text, update_message):
import config
botNick = config.NICK.lower() if config.NICK else None
botNicKLength = len(botNick) if botNick else 0
update_chat = update_message.chat
update_reply_to_message = update_message.reply_to_message
if botNick is None:
return update_text
else:
if update_text[:botNicKLength].lower() == botNick:
return update_text[botNicKLength:].strip()
else:
if update_chat.type == 'private' or (botNick and update_reply_to_message and update_reply_to_message.text and update_reply_to_message.from_user.is_bot and update_reply_to_message.sender_chat == None):
return update_text
else:
return None
time_out = 600
async def get_file_url(file, context):
file_id = file.file_id
new_file = await context.bot.get_file(file_id, read_timeout=time_out, write_timeout=time_out, connect_timeout=time_out, pool_timeout=time_out)
file_url = new_file.file_path
return file_url
from io import BytesIO
async def get_voice(file_id: str, context) -> str:
file_unique_id = file_id
filename_mp3 = f'{file_unique_id}.mp3'
try:
file = await context.bot.get_file(file_id)
file_bytes = await file.download_as_bytearray()
# 创建一个字节流对象
audio_stream = BytesIO(file_bytes)
# 直接使用字节流对象进行转录
import config
transcript = config.whisperBot.generate(audio_stream)
return transcript
except Exception as e:
return f"error: Temporarily unable to use voice function: {str(e)}"
finally:
import os
if os.path.exists(filename_mp3):
os.remove(filename_mp3)
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
async def GetMesage(update_message, context, voice=True):
from ModelMerge.src.ModelMerge.utils.scripts import Document_extract
image_url = None
file_url = None
reply_to_message_text = None
message = None
rawtext = None
voice_text = None
reply_to_message_file_content = None
chatid = str(update_message.chat_id)
if update_message.is_topic_message:
message_thread_id = update_message.message_thread_id
else:
message_thread_id = None
if message_thread_id:
convo_id = str(chatid) + "_" + str(message_thread_id)
else:
convo_id = str(chatid)
messageid = update_message.message_id
if update_message.text:
message = CutNICK(update_message.text, update_message)
rawtext = update_message.text
if update_message.reply_to_message:
reply_to_message_text = update_message.reply_to_message.text
reply_to_message_file = update_message.reply_to_message.document
if reply_to_message_file:
reply_to_message_file_url = await get_file_url(reply_to_message_file, context)
reply_to_message_file_content = Document_extract(reply_to_message_file_url, reply_to_message_file_url, None)
if update_message.photo:
photo = update_message.photo[-1]
image_url = await get_file_url(photo, context)
if update_message.caption:
message = rawtext = CutNICK(update_message.caption, update_message)
if voice and update_message.voice:
voice = update_message.voice.file_id
voice_text = await get_voice(voice, context)
if update_message.caption:
message = rawtext = CutNICK(update_message.caption, update_message)
if update_message.document:
file = update_message.document
file_url = await get_file_url(file, context)
if image_url == None and file_url and (file_url[-3:] == "jpg" or file_url[-3:] == "png" or file_url[-4:] == "jpeg"):
image_url = file_url
if update_message.caption:
message = rawtext = CutNICK(update_message.caption, update_message)
if update_message.audio:
file = update_message.audio
file_url = await get_file_url(file, context)
if image_url == None and file_url and (file_url[-3:] == "jpg" or file_url[-3:] == "png" or file_url[-4:] == "jpeg"):
image_url = file_url
if update_message.caption:
message = rawtext = CutNICK(update_message.caption, update_message)
return message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text
async def GetMesageInfo(update, context, voice=True):
if update.edited_message:
message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text = await GetMesage(update.edited_message, context, voice)
update_message = update.edited_message
elif update.callback_query:
message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text = await GetMesage(update.callback_query.message, context, voice)
update_message = update.callback_query.message
elif update.message:
message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text = await GetMesage(update.message, context, voice)
update_message = update.message
else:
return None, None, None, None, None, None, None, None, None, None, None, None
return message, rawtext, image_url, chatid, messageid, reply_to_message_text, update_message, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text
def safe_get(data, *keys):
for key in keys:
try:
data = data[key] if isinstance(data, (dict, list)) else data.get(key)
except (KeyError, IndexError, AttributeError, TypeError):
return None
return data
def is_emoji(character):
if len(character) != 1:
return False
code_point = ord(character)
# 定义表情符号的Unicode范围
emoji_ranges = [
(0x1F300, 0x1F5FF), # 杂项符号和图形
(0x1F600, 0x1F64F), # 表情符号
(0x1F680, 0x1F6FF), # 交通和地图符号
(0x2600, 0x26FF), # 杂项符号
(0x2700, 0x27BF), # 装饰符号
(0x1F900, 0x1F9FF) # 补充符号和图形
]
# 检查字符的Unicode码点是否在任何一个表情符号范围内
return any(start <= code_point <= end for start, end in emoji_ranges) |