В рамках данного проекта реализуется автоматизация отложенных публикаций средствами языка Python.
Устанавливаем и импортируем библиотеки#
pip install telethon, asyncio, aiohttp
import asyncio
import sqlite3
import logging
import random
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError
import aiohttp
Конфигурация логирования#
logging.basicConfig(
filename='quote_to_channel.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
Добавляем базовую конфигурацию#
# Конфигураци
API_ID = 12345678 # Ваш API ID
API_HASH = '1a293da1150a1212b3883332d46a578' # Ваш API Hash
PHONE = '+71234567890' # Номер телефона
DB_NAME = 'telegram_quotes.db'
TARGET_CHANNEL = '@TruthVibee' # Ваш канал
RUN_COUNT = 1 # Количество выполнений для заполнения отложенных публикаций
SCHEDULE_TIMES = ['10:00', '18:00'] # Время отправки (EEST)
TIMEZONE = 'Europe/Tallinn' # EEST часовой пояс (UTC+3)
TIME_OFFSET_MINUTES = 15 # Максимальное смещение времени (±15 минут)
FORISMATIC_API_URL = 'http://api.forismatic.com/api/1.0/'
Нам понадобится API_ID и API_HASH инструкция Как получить API_ID и API_HASH Telegram.
Далее указываем телефон PHONE, имя базы данных DB_NAME (скрипт создаст ее, если она не сущетсвует)
TARGET_CHANNEL - Указываем ссылку на канал в который будем делать отложенные посты
RUN_COUNT - Количество выполнений (дней которые мы хотим запланировать)
SCHEDULE_TIMES - Время отправка поста в запланированном расписании
TIMEZONE - часовой пояс
TIME_OFFSET_MINUTES - разброс времени, чтобы сообщения были отправлены в разное время
FORISMATIC_API_URL - в данном случае используем API forismatic.com
Инициализируем базу данных#
def init_db(db_name='telegram_quotes.db'):
"""Инициализация базы данных SQLite"""
logger.info("Инициализация базы данных")
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS scheduled_quotes (
quote_id INTEGER PRIMARY KEY AUTOINCREMENT,
text TEXT,
author TEXT
scheduled_date TEXT,
target_channel TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS used_keys (
key_id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key INTEGER UNIQUE
)
''')
conn.commit()
logger.info("Таблицы scheduled_quotes и used_keys созданы или уже существуют")
return conn
Получаем цитату#
async def fetch_quote(conn):
"""Получение случайной цитаты с Forismatic API с проверкой автора и уникальности ключа"""
cursor = conn.cursor()
max_attempts = 10 # Ограничение попыток для избежания бесконечного цикла
for attempt in range(max_attempts):
# Получаем список использованных ключей
cursor.execute('SELECT api_key FROM used_keys')
used_keys = {row[0] for row in cursor.fetchall()}
# Генерируем новый ключ, пока не найдем уникальный
new_key = random.randint(1, 999999)
while new_key in used_keys and len(used_keys) < 999999:
new_key = random.randint(1, 999999)
params = {
'method': 'getQuote',
'format': 'json',
'lang': 'ru',
'key': new_key
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(FORISMATIC_API_URL, data=params) as response:
if response.status != 200:
logger.error(f"Ошибка API: статус {response.status}")
return None, None
data = await response.json()
quote_text = data.get('quoteText', '').strip()
quote_author = data.get('quoteAuthor', '').strip() or 'Unknown'
if not quote_text:
logger.warning("Получена пустая цитата")
continue
logger.info(f"Получена цитата: {quote_text[:50]}...")
# Проверяем наличие автора и повторяем запрос, если автор отсутствует
if not quote_author or quote_author == 'Unknown':
logger.warning(f"Отсутствует автор или автор 'Unknown' на попытке {attempt + 1}")
continue
# Сохраняем использованный ключ
cursor.execute('INSERT INTO used_keys (api_key) VALUES (?)', (new_key,))
conn.commit()
logger.info(f"Сохранен уникальный ключ: {new_key}")
return quote_text, quote_author
except Exception as e:
logger.error(f"Ошибка при получении цитаты: {e}")
continue
logger.error(f"Не удалось получить цитату с автором после {max_attempts} попыток")
return None, None
Обрабатывем полученные данные и отправляем в телеграм канал#
async def process_quotes():
"""Обработка и планирование цитат"""
client = TelegramClient('session_name', API_ID, API_HASH)
conn = init_db(DB_NAME)
cursor = conn.cursor()
try:
logger.info("Подключение к Telegram API")
await client.start(phone=PHONE)
if not await client.is_user_authorized():
logger.error("Ошибка: Пользователь не авторизован")
print("Требуется авторизация. Пожалуйста, следуйте инструкциям.")
return
# Извлечение последней запланированной даты
cursor.execute('''
SELECT MAX(scheduled_date) FROM scheduled_quotes WHERE target_channel = ?
''', (TARGET_CHANNEL,))
last_scheduled_date = cursor.fetchone()[0]
tz = ZoneInfo(TIMEZONE)
if last_scheduled_date:
try:
last_date = datetime.fromisoformat(last_scheduled_date)
start_date = last_date.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
logger.info(f"Последняя запланированная дата: {last_scheduled_date}, начинаем с {start_date}")
except ValueError as e:
logger.error(f"Не удалось разобрать дату {last_scheduled_date}: {e}, использую текущую дату")
start_date = datetime.now(tz).replace(hour=0, minute=0, second=0, microsecond=0)
else:
start_date = datetime.now(tz).replace(hour=0, minute=0, second=0, microsecond=0)
logger.info(f"Запланированных цитат нет, начинаем с {start_date}")
# Получение цитат для указанного количества дней
schedule_times = [datetime.strptime(t, '%H:%M').time() for t in SCHEDULE_TIMES]
quotes_scheduled = 0
for _ in range(RUN_COUNT): # Выполнение для заданного количества дней
for i in range(2): # Две цитаты в день (10:00 и 18:00)
quote_text, quote_author = await fetch_quote(conn)
if not quote_text:
logger.warning("Не удалось получить цитату, пропускаем")
continue
time_index = i % len(schedule_times)
schedule_time = schedule_times[time_index]
schedule_date = start_date.replace(
hour=schedule_time.hour,
minute=schedule_time.minute,
second=0,
microsecond=0
)
# Добавление случайного смещения (±15 минут)
offset_minutes = random.randint(-TIME_OFFSET_MINUTES, TIME_OFFSET_MINUTES)
schedule_date += timedelta(minutes=offset_minutes)
logger.info(f"Применено случайное смещение {offset_minutes} минут для времени {schedule_time}")
# Если время в прошлом, перенести на следующий день
current_datetime = datetime.now(tz)
if schedule_date <= current_datetime:
schedule_date += timedelta(days=1)
logger.info(f"Время {schedule_date.strftime('%Y-%m-%d %H:%M:%S %Z')} в прошлом, перенесено на следующий день")
await schedule_quote(client, quote_text, quote_author, TARGET_CHANNEL, schedule_date, conn)
quotes_scheduled += 1
start_date += timedelta(days=1) # Переход к следующему дню
logger.info(f"Запланировано {quotes_scheduled} новых цитат")
print(f"Запланировано {quotes_scheduled} новых цитат")
except SessionPasswordNeededError:
logger.error("Требуется двухфакторная аутентификация")
print("Требуется двухфакторная аутентификация. Введите пароль:")
password = input()
await client.sign_in(password=password)
await process_quotes()
except Exception as e:
logger.error(f"Произошла ошибка: {e}")
print(f"Произошла ошибка: {e}")
finally:
await client.disconnect()
logger.info("Отключение от Telegram API")
conn.close()
logger.info("Соединение с базой данных закрыто")
Вызываем функцию main#
async def main():
"""Основная функция"""
logger.info("Запуск программы отправки цитат")
await process_quotes()
if __name__ == "__main__":
asyncio.run(main())
