Files
vhbot/vhbot.py

103 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
import subprocess
# Путь к vhserver
VHSERVER_PATH = '/home/domi/linuxgsm'
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# Глобальный список для хранения ID всех сообщений бота
bot_message_ids = []
# Команда для управления сервером
def run_local_command(command):
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
output = result.stdout
error = result.stderr
return output, error
except subprocess.CalledProcessError as e:
return None, str(e)
# Функция для удаления всех предыдущих сообщений бота
async def delete_previous_messages(update: Update, context: ContextTypes.DEFAULT_TYPE):
global bot_message_ids
for message_id in bot_message_ids:
try:
await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=message_id)
except Exception as e:
logger.warning(f"Не удалось удалить сообщение {message_id}: {e}")
# Очищаем список сообщений
bot_message_ids = []
# Команда для запуска сервера
async def start_server(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info("Получена команда /start_server")
await delete_previous_messages(update, context) # Удаляем все предыдущие сообщения
output, error = run_local_command(f'cd {VHSERVER_PATH} && ./vhserver start')
if error:
message = await update.message.reply_text(f'Ошибка: {error}')
else:
message = await update.message.reply_text('Сервер запущен.')
# Добавляем ID нового сообщения в список
bot_message_ids.append(message.message_id)
# Команда для остановки сервера
async def stop_server(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info("Получена команда /stop_server")
await delete_previous_messages(update, context) # Удаляем все предыдущие сообщения
output, error = run_local_command(f'cd {VHSERVER_PATH} && ./vhserver stop')
if error:
message = await update.message.reply_text(f'Ошибка: {error}')
else:
message = await update.message.reply_text('Сервер остановлен.')
# Добавляем ID нового сообщения в список
bot_message_ids.append(message.message_id)
# Команда для проверки статуса сервера
async def status_server(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info("Получена команда /status_server")
await delete_previous_messages(update, context) # Удаляем все предыдущие сообщения
output, error = run_local_command(f'cd {VHSERVER_PATH} && ./vhserver details')
if error:
message = await update.message.reply_text(f'Ошибка: {error}')
else:
message = await update.message.reply_text(f'Статус сервера: {output}')
# Добавляем ID нового сообщения в список
bot_message_ids.append(message.message_id)
# Команда для обновления сервера
async def update_server(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info("Получена команда /update_server")
await delete_previous_messages(update, context) # Удаляем все предыдущие сообщения
output, error = run_local_command(f'cd {VHSERVER_PATH} && ./vhserver update')
if error:
message = await update.message.reply_text(f'Ошибка: {error}')
else:
message = await update.message.reply_text('Сервер обновлен.')
# Добавляем ID нового сообщения в список
bot_message_ids.append(message.message_id)
# Основная функция
def main():
application = Application.builder().token("6517664407:AAE9Tt-sjnU20MdrGUhdIyY-R6LU8eNznFs").build()
# Регистрация команд
application.add_handler(CommandHandler("start_server", start_server))
application.add_handler(CommandHandler("stop_server", stop_server))
application.add_handler(CommandHandler("status_server", status_server))
application.add_handler(CommandHandler("update_server", update_server)) # Добавляем новую команду
# Запуск бота
application.run_polling()
if __name__ == '__main__':
main()