|
|
def CutNICK(update_text, update_message): |
|
|
import config |
|
|
botNick = config.NICK.lower() if config.NICK else None |
|
|
botNicKLength = len(botNick) if botNick else 0 |
|
|
|
|
|
update_chat = update_message.chat |
|
|
update_reply_to_message = update_message.reply_to_message |
|
|
if botNick is None: |
|
|
return update_text |
|
|
else: |
|
|
if update_text[:botNicKLength].lower() == botNick: |
|
|
return update_text[botNicKLength:].strip() |
|
|
else: |
|
|
if update_chat.type == 'private' or (botNick and update_reply_to_message and update_reply_to_message.text and update_reply_to_message.from_user.is_bot and update_reply_to_message.sender_chat == None): |
|
|
return update_text |
|
|
else: |
|
|
return None |
|
|
|
|
|
time_out = 600 |
|
|
async def get_file_url(file, context): |
|
|
file_id = file.file_id |
|
|
new_file = await context.bot.get_file(file_id, read_timeout=time_out, write_timeout=time_out, connect_timeout=time_out, pool_timeout=time_out) |
|
|
file_url = new_file.file_path |
|
|
return file_url |
|
|
|
|
|
from io import BytesIO |
|
|
async def get_voice(file_id: str, context) -> str: |
|
|
file_unique_id = file_id |
|
|
filename_mp3 = f'{file_unique_id}.mp3' |
|
|
|
|
|
try: |
|
|
file = await context.bot.get_file(file_id) |
|
|
file_bytes = await file.download_as_bytearray() |
|
|
|
|
|
|
|
|
audio_stream = BytesIO(file_bytes) |
|
|
|
|
|
|
|
|
import config |
|
|
transcript = config.whisperBot.generate(audio_stream) |
|
|
|
|
|
return transcript |
|
|
|
|
|
except Exception as e: |
|
|
return f"error: Temporarily unable to use voice function: {str(e)}" |
|
|
finally: |
|
|
import os |
|
|
if os.path.exists(filename_mp3): |
|
|
os.remove(filename_mp3) |
|
|
|
|
|
import os |
|
|
import sys |
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
async def GetMesage(update_message, context, voice=True): |
|
|
from ModelMerge.src.ModelMerge.utils.scripts import Document_extract |
|
|
image_url = None |
|
|
file_url = None |
|
|
reply_to_message_text = None |
|
|
message = None |
|
|
rawtext = None |
|
|
voice_text = None |
|
|
reply_to_message_file_content = None |
|
|
|
|
|
chatid = str(update_message.chat_id) |
|
|
if update_message.is_topic_message: |
|
|
message_thread_id = update_message.message_thread_id |
|
|
else: |
|
|
message_thread_id = None |
|
|
if message_thread_id: |
|
|
convo_id = str(chatid) + "_" + str(message_thread_id) |
|
|
else: |
|
|
convo_id = str(chatid) |
|
|
|
|
|
messageid = update_message.message_id |
|
|
|
|
|
if update_message.text: |
|
|
message = CutNICK(update_message.text, update_message) |
|
|
rawtext = update_message.text |
|
|
|
|
|
if update_message.reply_to_message: |
|
|
reply_to_message_text = update_message.reply_to_message.text |
|
|
reply_to_message_file = update_message.reply_to_message.document |
|
|
if reply_to_message_file: |
|
|
reply_to_message_file_url = await get_file_url(reply_to_message_file, context) |
|
|
reply_to_message_file_content = Document_extract(reply_to_message_file_url, reply_to_message_file_url, None) |
|
|
|
|
|
if update_message.photo: |
|
|
photo = update_message.photo[-1] |
|
|
|
|
|
image_url = await get_file_url(photo, context) |
|
|
|
|
|
if update_message.caption: |
|
|
message = rawtext = CutNICK(update_message.caption, update_message) |
|
|
|
|
|
if voice and update_message.voice: |
|
|
voice = update_message.voice.file_id |
|
|
voice_text = await get_voice(voice, context) |
|
|
|
|
|
if update_message.caption: |
|
|
message = rawtext = CutNICK(update_message.caption, update_message) |
|
|
|
|
|
if update_message.document: |
|
|
file = update_message.document |
|
|
|
|
|
file_url = await get_file_url(file, context) |
|
|
|
|
|
if image_url == None and file_url and (file_url[-3:] == "jpg" or file_url[-3:] == "png" or file_url[-4:] == "jpeg"): |
|
|
image_url = file_url |
|
|
|
|
|
if update_message.caption: |
|
|
message = rawtext = CutNICK(update_message.caption, update_message) |
|
|
|
|
|
if update_message.audio: |
|
|
file = update_message.audio |
|
|
|
|
|
file_url = await get_file_url(file, context) |
|
|
|
|
|
if image_url == None and file_url and (file_url[-3:] == "jpg" or file_url[-3:] == "png" or file_url[-4:] == "jpeg"): |
|
|
image_url = file_url |
|
|
|
|
|
if update_message.caption: |
|
|
message = rawtext = CutNICK(update_message.caption, update_message) |
|
|
|
|
|
return message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text |
|
|
|
|
|
async def GetMesageInfo(update, context, voice=True): |
|
|
if update.edited_message: |
|
|
message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text = await GetMesage(update.edited_message, context, voice) |
|
|
update_message = update.edited_message |
|
|
elif update.callback_query: |
|
|
message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text = await GetMesage(update.callback_query.message, context, voice) |
|
|
update_message = update.callback_query.message |
|
|
elif update.message: |
|
|
message, rawtext, image_url, chatid, messageid, reply_to_message_text, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text = await GetMesage(update.message, context, voice) |
|
|
update_message = update.message |
|
|
else: |
|
|
return None, None, None, None, None, None, None, None, None, None, None, None |
|
|
return message, rawtext, image_url, chatid, messageid, reply_to_message_text, update_message, message_thread_id, convo_id, file_url, reply_to_message_file_content, voice_text |
|
|
|
|
|
def safe_get(data, *keys): |
|
|
for key in keys: |
|
|
try: |
|
|
data = data[key] if isinstance(data, (dict, list)) else data.get(key) |
|
|
except (KeyError, IndexError, AttributeError, TypeError): |
|
|
return None |
|
|
return data |
|
|
|
|
|
def is_emoji(character): |
|
|
if len(character) != 1: |
|
|
return False |
|
|
|
|
|
code_point = ord(character) |
|
|
|
|
|
|
|
|
emoji_ranges = [ |
|
|
(0x1F300, 0x1F5FF), |
|
|
(0x1F600, 0x1F64F), |
|
|
(0x1F680, 0x1F6FF), |
|
|
(0x2600, 0x26FF), |
|
|
(0x2700, 0x27BF), |
|
|
(0x1F900, 0x1F9FF) |
|
|
] |
|
|
|
|
|
|
|
|
return any(start <= code_point <= end for start, end in emoji_ranges) |