Skip to content
Snippets Groups Projects
Commit ec0b18ef authored by Jan Eggers's avatar Jan Eggers
Browse files

V0.2.0.0 mit Telegram

parent 3700ddea
No related branches found
No related tags found
No related merge requests found
No preview for this file type
......@@ -2,6 +2,22 @@ from src.aichecker.tg_check import *
if __name__ == "__main__":
# Bluesky-Check
handle_str = input("Handle des Kanals eingeben: ")
#handle_str = input("Handle des Kanals eingeben: ")
handle_str = "telegram"
channels_dict = tgc_profile(handle_str)
print(channels_dict)
\ No newline at end of file
last_post = channels_dict['n_posts']
print(channels_dict)
# Lies eine Seite (mit bis zu 16 Posts), ohne Mediendateien anzulegen
# und ohne Audios zu transkribieren
posts = tgc_blockread(channels_dict['name'],nr=1, save=False, describe=False)
print(posts)
# Jetzt die aktuellsten Posts, mit Transkription/Mediendateien
#posts = tgc_read(channels_dict['name'],nr=None, save=True, transcribe=True)
#print(posts)
# Nur ein einzelner Post
posts = tgc_read(channels_dict['name'],nr=last_post)
print(posts)
# Über die Post-URL
print(tgc_read_url('https://t.me/telegram/46',save=True, describe=True))
posts = tgc_read_range(channels_dict['name'], last_post - 19, last_post, save = True, describe= True)
print("Ende")
\ No newline at end of file
File added
File added
media/wilhelmkachel_1076_photo.jpg

119 KiB

# Telegram-Posts lesen
Von Channeln kann man Posts auf zwei Arten lesen:
- über ihre Kontextseite (t.me/s/<channel>/<id>)
- über ihre individuelle Post-Seite (t.me/s/<channel>/<id>)
# Kontextseite
Die Kontextseite ist etwas bequemer, denn:
- sie lädt direkt komplett (die Post-Seite lädt ein Embed nach)
- sie wird auch angezeigt, wenn es die Post-ID nicht gibt
# Postseite
Die Postseite lädt die eigentlichen Inhalte als Embed in ein iframe. Es ist möglich, dieses Embed direkt über requests zu laden:
- https://t.me/<channel>/<id>?embed=1&mode=tme
- Parameter embed: alles >0 scheint zu funktionieren
- Parameter mode: akzeptiert auch andere Namen
- Untersuchen, ob es ein Element div.tgme_widget_message_error enthält - dann konnte der Post nicht geladen werden
- Enthält ein Element div.tgme_widget_message_error
<div class="tgme_widget_message text_not_supported_wrap js-widget_message" data-post="telegram/361" data-view="eyJjIjotMTAwNTY0MDg5MiwicCI6MzYxLCJ0IjoxNzM1OTQxNTY5LCJoIjoiOGJmNWMzZDM1OTE0Y2I1NTMyIn0" data-peer="c1005640892_-6044378432856379164" data-peer-hash="556f33b85ddb50a1e1" data-post-id="361">
\ No newline at end of file
from .check_bsky import *
from .bildbeschreibung import ai_description
from .transcribe import ai_description
from .detectora import query_detectora
from .imagecheck import query_aiornot
from .tg_check import tgc_clean, tgc_url, tgc_blockread, tgc_collect, tgc_profile
\ No newline at end of file
from .tg_check import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile
\ No newline at end of file
......@@ -7,7 +7,7 @@ import json
import pandas as pd
from .detectora import query_detectora
from .imagecheck import query_aiornot
from .bildbeschreibung import gpt4_description
from .transcribe import gpt4_description
import requests
import os
......
# imagecheck.py
# Erfragt KI-Wahrscheinlichkeit für ein Bild über Hive- und AIorNot-API
from .bildbeschreibung import ai_description
from .transcribe import ai_description
import requests
import json
......
# tg_check.py
#
# Mistral-Übersetzung aus R (mein altes Rtgchannels-Projekt V0.11)
# Mistral-Übersetzung aus R (mein altes Rtgchannels-Projekt V0.1.1)
# Angepasst auf Listen statt Dataframes
#
# Noch nicht alles getestet und umgeschrieben
# 1-2025 Jan Eggers
......@@ -13,22 +12,18 @@ from bs4 import BeautifulSoup
from datetime import datetime
import os
import re
import base64
from .transcribe import gpt4_description, transcribe
def extract_k(n_str: str):
if n_str.endswith('K'):
try:
# Zahlen wie '5.06K', '1K'
n = int(float(n_str[:-1]) * 1000)
except:
return None
else:
try:
n = int(n_str)
except:
return None
return n
try:
# Zahlen wie '5.06K', '1K', '1.2M'
n_f = float(re.sub(r'[KMB]$', lambda m: {'K': 'e+03', 'M': 'e+06', 'B': 'e+09'}[m.group()], n_str))
return int(n_f)
except:
return None
def tgc_profile(channel="ffmfreiheit"):
def tgc_profile(channel="telegram"):
"""
Generates base statistics for a Telegram channel.
......@@ -51,203 +46,255 @@ def tgc_profile(channel="ffmfreiheit"):
except requests.exceptions.RequestException:
print(f"Warning: Channel {c} not found")
return None
channel_info = {}
if tgm.select_one("div.tgme_channel_info_description") is not None:
description = tgm.select_one("div.tgme_channel_info_description").get_text()
else:
description = None
channel_info = {'name': c,
'description': description}
for info_counter in tgm.find_all('div', class_='tgme_channel_info_counter'):
counter_value = info_counter.find('span', class_='counter_value').text.strip()
counter_type = info_counter.find('span', class_='counter_type').text.strip()
channel_info[counter_type] = extract_k(counter_value)
return channel_info
"""
# Read values from the info card
counter_type = [span.get_text() for span in tgm.select('div.tgme_channel_info_counter span.counter_type')]
counter_values = [extract_k(re.sub(r'[KMB]$', lambda m: {'K': 'e+03', 'M': 'e+06', 'B': 'e+09'}[m.group()], span.get_text()))
for span in tgm.select('div.tgme_channel_info_counter span.counter_value')]
df = pd.DataFrame({'name': counter_type, 'values': counter_values}).pivot(index=None, columns='name', values='values').reset_index(drop=True)
# Add id, description, title
df['id'] = c
df['title'] = tgm.select_one('div.tgme_channel_info_header_title').get_text()
df['description'] = tgm.select_one('div.tgme_channel_info_description').get_text()
# The last post is visible on this page. Gather its number.
last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href']
df['last_post_n'] = int(re.search(r'[0-9]+$', last_post_href).group())
# The last post is visible on this page. Gather its number and date.
last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href']
channel_info['n_posts'] = int(re.search(r'[0-9]+$', last_post_href).group())
df['last_post_datetime'] = pd.to_datetime(tgm.select('time.time')[-1]['datetime'])
# Now get the first post.
tgm_firstpost = BeautifulSoup(requests.get(f"{c_url}/1").content, 'html.parser')
df['created'] = pd.to_datetime(tgm_firstpost.select_one('time')['datetime'])
# Calculate posts per week
df['post_per_week'] = df['last_post_n'] / ((datetime.now() - df['created']).days / 7)
if channels_df is None:
channels_df = df
else:
channels_df = pd.concat([channels_df, df], ignore_index=True)
return channel_info
return channels_df
"""
def tgc_clean(cname):
"""
Helper function returning a sanitized Telegram channel name in lowercase.
Parameters:
cname (str or list): Telegram channel name or URL.
cname (str): Telegram channel name or URL.
Returns:
str or list: Lower-case of the extracted channel name.
str: Lower-case of the extracted channel name.
"""
# Convert to lower case
cname = [name.lower() for name in cname] if isinstance(cname, list) else cname.lower()
name = cname.lower()
# Define the regex patterns
tme_pattern = re.compile(r"t\.me/s/")
extract_pattern = re.compile(r"(?<=t\.me/)[a-zäöüß0-9_]+")
sanitize_pattern = re.compile(r"[a-zäöüß0-9_]+")
if tme_pattern.search(name):
n = extract_pattern.search(name).group(0)
else:
n = sanitize_pattern.search(name).group(0)
return n
def process_name(name):
if tme_pattern.search(name):
return extract_pattern.search(name).group(0)
def save_url(fname, name, mdir="./media"):
# Die Medien-URLs bekommen oft einen Parameter mit übergeben; deswegen nicht nur
# "irgendwas.ogg" berücksichtigen, sondern auch "irgendwas.mp4?nochirgendwas"
content_ext = re.search("\.[a-zA-Z0-9]+(?=\?|$)",fname).group(0)
content_file = f"{mdir}/{name}{content_ext}"
try:
os.makedirs(os.path.dirname(content_file), exist_ok=True)
except:
print(f"Kann kein Media-Directory in {mdir} öffnen")
return None
try:
with open(content_file, 'wb') as f:
f.write(requests.get(fname).content)
return content_file
except:
print(f"Kann Datei {content_file} nicht schreiben")
return None
def get_channel_from_url(channel:str):
return re.search(r"(?<=t\.me\/).+(?=\/[0-9])",channel).group(0)
def tg_post_parse(b, save = True, describe = True):
# Immer vorhanden:
# Postnummer, Zeitstempel (auch wenn er in Einzel-Posts als datetime auftaucht und in Channel_seiten als time)
b_nr = int(re.search(r'[0-9]+$', b.select_one("a.tgme_widget_message_date")['href']).group())
if b.select_one("time.time") is not None:
timestamp = datetime.fromisoformat(b.select_one("time.time")['datetime'])
else: # Einzel-Post
timestamp = datetime.fromisoformat(b.select_one("time.datetime")['datetime'])
#
if b.select_one("span.tgme_widget_message_views") is not None:
views = extract_k(b.select_one("span.tgme_widget_message_views").get_text())
else:
views = None
if b.select_one("a.tgme_widget_message_date"):
post_url = b.select_one("a.tgme_widget_message_date")['href']
channel = get_channel_from_url(post_url)
else:
post_url = None
textlinks = b.select("div.tgme_widget_message_text a")
links = [a['href'] for a in textlinks if a['href'].startswith("http")]
hashtags = [a['href'][3:] for a in textlinks if a['href'].startswith("?q=")]
### Die möglichen Content-Abschnitte eines Posts ###
# Text
if b.select_one("div.tgme_widget_message_text_wrap") is not None:
text = b.select_one("div.tgme_widget_message_text").get_text()
else:
text = None
# Sticker (Beispiel: https://t.me/telegram/23)
if b.select_one("div.tgme_widget_message_sticker_wrap") is not None:
sticker_url = b.select_one("i.tgme_widget_message_sticker")['data-webp']
sticker = {'url': sticker_url,
'image': base64.b64encode(requests.get(sticker_url).content).decode('utf-8')
}
if describe:
# GPT4o-mini versteht JPG, PNG, nicht animiertes GIF... und WEBP.
sticker['description'] = gpt4_description(sticker_url)
if save:
sticker['file'] = save_url(sticker_url, f"{channel}_{b_nr}_sticker")
else:
sticker = None
# Photo URL
if b.select_one("a.tgme_widget_message_photo_wrap") is not None:
photo_url = re.search(r"(?<=image\:url\(\').+(?=\')", b.select_one("a.tgme_widget_message_photo_wrap")['style']).group(0)
photo = {'url': photo_url,
'image': base64.b64encode(requests.get(photo_url).content).decode('utf-8')
}
if describe:
photo['description'] = gpt4_description(f"data:image/jpeg;base64,{photo['image']}")
if save:
photo['file'] = save_url(photo_url, f"{channel}_{b_nr}_photo")
else:
photo = None
# Sprachnachricht tgme_widget_message_voice https://t.me/fragunsdochDasOriginal/27176
if b.select_one('audio.tgme_widget_message_voice') is not None:
# Link auf OGG-Datei
voice_url = b.select_one('audio.tgme_widget_message_voice')[url]
voice_duration = b.select_one('time.tgme_widget_message_voice_duration').get_text()
# Für Transkription immer lokale Kopie anlegen
if save or describe:
voice['file'] = save_url(voice_url, f"{channel}_{b_nr}_voice")
if describe:
voice['transcription'] = transcribe(voice['file'])
else:
voice = None
# Video URL (Beispiel: https://t.me/telegram/46)
if b.select_one('video.tgme_widget_message_video') is not None:
video_url = b.select_one('video.tgme_widget_message_video')['src']
if b.select_one('tgme_widget_message_video_thumb') is not None:
video_thumbnail = re.search(r"(?<=image\:url\('\)).+(?=\')",b.select_one('tgme_widget_message_video_thumb')['style'].group(0))
video = {'url': video_url,
'thumbnail': video_thumbnail,
'image': base64.b64encode(requests.get(video_thumbnail).content).decode('utf-8')
}
else:
return sanitize_pattern.search(name).group(0)
if isinstance(cname, list):
return [process_name(name) for name in cname]
video = {'url': video_url,
}
if save or describe:
video['file'] = save_url(video_url, f"{channel}_{b_nr}_video")
if describe:
video['transcription'] = transcribe(video['file'])
if 'image' in video:
video['description'] = f"data:image/jpeg;base64,{video['image']}"
else:
return process_name(cname)
#################### HIER SEYEN DRACHEN #####################
# All the untested functions follow here - they are just Mistral
# translations/rewrites of the R stuff.
def tgc_url(cname, nr):
"""
Helper function returning a Telegram channel post URL.
Parameters:
cname (str): Telegram channel name or URL.
nr (int): Post number.
Returns:
str: URL.
"""
cname = cname.lower()
match = re.search(r"[a-zäöüß0-9_]+", cname)
if match:
return f"https://t.me/s/{match.group(0)}/{nr}"
return None
video = None
# Document / Audio URL? https://t.me/telegram/35
# Link-Preview: https://t.me/s/telegram/15
# Example usage
# test_list = tgc_blockread("telegram", nr=1)
# test_list = tgc_blockread("telegram")
# Forwarded
if b.select_one("a.tgme_widget_message_forwarded_from_name") is not None:
forward_url = b.select_one("a.tgme_widget_message_forwarded_from_name")['href']
forward_name = channel
forward = {
'url': forward_url,
'name': forward_name,
}
else:
forward = None
post_dict = {
'channel': channel,
'nr': b_nr,
'url': post_url,
'views': views, # Momentaufnahme!
'timedate': timestamp,
'text': text,
'photo': photo,
'sticker': sticker,
'video': video,
'voice': voice,
'forwards': forward,
'links': links,
'hashtags': [f"#{tag}" for tag in hashtags],
}
return post_dict
def tgc_read(cname, nr, save=True, describe = False):
c = tgc_clean(cname)
channel_url = f"https://t.me/{c}/{nr}"
return tgc_read_url(channel_url)
def tgc_read_url(channel_url, save=True, describe = False):
# Reads a single post from its URL
# Supposes that the URL is well-formed.
channel_url += "?embed=1&mode=tme"
response = requests.get(channel_url)
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
# Error message?
if tgm.select_one("div.tgme_widget_message_error") is not None:
print(f"Fehler beim Lesen von {channel_url}")
return None
b = tgm.select_one("div.tgme_widget_message")
return tg_post_parse(b, save, describe)
def tgc_blockread(cname="telegram", nr=None, save=True):
def tgc_blockread(cname="telegram", nr=None, save=True, describe=False):
"""
Reads a block of posts from the channel - normally 16 are displayed.
If single parameter is set, read only the post nr; return empty if it
does not exist.
Parameters:
cname (str): Channel name as a string (non-name characters are stripped).
nr (int, optional): Number where the block is centered. If none is given, read last post.
save (bool, default True): Saves images to an image folder.
describe (bool, default True): Transcribes/describes media content
single (bool, default False): Return a single post rather than up to 16
Returns:
list of dict: A list of dictionaries consisting of up to 16 rows for each post.
list of dict: A list of dictionaries consisting of up to 16 posts.
"""
if nr is None:
nr = ""
nr = "" # Without a number, the most recent page/post is shown
else:
nr = int(nr)
cname = tgc_clean(cname)
tgc_url_ = tgc_url(cname, nr)
response = requests.get(tgc_url_)
c = tgc_clean(cname)
# Nur einen Post holen? Dann t.me/<channel>/<nr>,
# sonst t.me/s/<channel>/<nr>
channel_url = f"https://t.me/s/{c}/{nr}"
response = requests.get(channel_url)
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
block = tgm.select("div.tgme_widget_message_wrap")
block_list = []
for b in block:
b_nr = int(re.search(r'[0-9]+$', b.select_one("a.tgme_widget_message_date")['href']).group())
forward = b.select_one("a.tgme_widget_message_forwarded_from_name")
forward_url = forward['href'] if forward else None
textlinks = b.select("div.tgme_widget_message_text a")
links = [a['href'] for a in textlinks if a['href'].startswith("http")]
hashtags = [a['href'][3:] for a in textlinks if a['href'].startswith("?q=")]
photo_url_match = re.search(r"(?<=image\:url\('\)).+(?=\')", b.select_one("a.tgme_widget_message_photo_wrap")['style'])
photo_url = photo_url_match.group(0) if photo_url_match else None
post_dict = {
'name': cname,
'nr': b_nr,
'url': b.select_one("a.tgme_widget_message_date")['href'],
'timedate': pd.to_datetime(b.select_one("time.time")['datetime']),
'text': b.select_one("div.tgme_widget_message_text").get_text(),
'views': int(re.sub(r'[KMB]$', lambda m: {'K': 'e+03', 'M': 'e+06', 'B': 'e+09'}[m.group()], b.select_one("span.tgme_widget_message_views").get_text())),
'forwards': forward_url,
'links': links,
'hashtags': [f"#{tag}" for tag in hashtags],
'photo': photo_url
}
if save and photo_url:
photo_file_search_string = r'\.[a-zA-Z]+$'
photo_file = f"./media/{cname}_post_{b_nr}{re.search(photo_file_search_string, photo_url).group(0)}"
os.makedirs(os.path.dirname(photo_file), exist_ok=True)
with open(photo_file, 'wb') as f:
f.write(requests.get(photo_url).content)
block_list.append(post_dict)
block = tgm.select("div.tgme_widget_message_wrap")
block_list = [tg_post_parse(b, save, describe) for b in block]
return block_list
# Examples:
# test_list = tgc_collect("telegram")
# test_list = tgc_collect("telegram", first=1)
# test_list = tgc_collect("telegram", -100)
def tgc_collect(cname, first=1, save=False):
"""
Collect hashtags, keywords, and links from a Telegram channel.
Parameters:
cname (str): Channel name to crawl.
first (int): Earliest number of blocks to read (0 = all, negative reads number of posts).
save (bool, default False): Saves images to an image folder.
Returns:
list of dict: A list of dictionaries containing the posts in ascending order.
"""
collect_list = tgc_blockread(cname, save=save)
min_nr = min(post['nr'] for post in collect_list)
max_nr = max(post['nr'] for post in collect_list)
if first < 1:
first = max_nr + first + 1
if first == 0:
first = 1
while first < min_nr:
block_list = tgc_blockread(cname, min_nr - 8, save=save)
block_list = [post for post in block_list if post['nr'] < min_nr]
collect_list = block_list + collect_list
min_nr = min(post['nr'] for post in block_list)
print(".", end="")
print(f"\nRead {len(collect_list)} posts\n")
return [post for post in collect_list if post['nr'] >= first]
def tgc_read_range(cname, n1=1, n2=None, save=True, describe = True):
# Liest einen Bereich von Posts
# Zuerst: Nummer des letzten Posts holen
profile = tgc_profile(cname)
# Sicherheitscheck: erste Post-Nummer überhaupt schon gepostet?
max_nr = profile['n_post']
if n1 > max_nr:
return None
loop = True
posts = []
while loop:
new_posts = tgc_blockread(cname, n1, save, describe)
nr_values = [post['nr'] for post in new_posts]
last_nr = max(nr_values)
# Abbruchbedingungen: Letzten Post des Channels erreicht, oder Ende des zu lesenden Bereichs
loop = (max_nr == last_nr) or (last_nr > n2)
posts.extend(new_posts)
return posts
\ No newline at end of file
# transcribe.py
#
# Bilder mit GPT4o-mini in Bildbeschreibung umwandeln,
# Audios/Videos lokal mit whisper transkribieren
import ollama
from openai import OpenAI
from pathlib import Path
import os
import base64
import whisper
prompt = """Du bist Barrierefreiheits-Assistent.
Du erstellst eine deutsche Bildbeschreibung für den Alt-Text.
......@@ -51,28 +56,88 @@ def llama_description(b64_image):
return response['message']['content'].strip()
def ai_description(fname):
# Use llama3.2-vision to describe images
# Use whisper.cpp command-line tool to transcribe audio and video
desc = f"Filetype: {fname.lower()[-4:]}"
image_folder = os.path.join(os.path.dirname(__file__), 'messages')
file_path = os.path.join(image_folder, fname)
file_path = os.path.join(image_folder, fname)
if fname.lower().endswith(('.jpg', '.jpeg')):
try:
with open(file_path, 'rb') as file:
file_content = file.read()
image = base64.b64encode(file_content).decode('utf-8')
except FileNotFoundError:
return "!!!Datei nicht gefunden!!!"
except Exception as e:
raise Exception(f"Error reading file {fname}: {str(e)}")
if OLLAMA:
desc2 = llama_description(image)
else:
desc2 = gpt4_description(image)
desc2 = gpt4_description(image)
desc = f"{desc}\n{desc2}"
def ai_description(image):
if OLLAMA:
desc2 = llama_description(image)
else:
desc2 = gpt4_description(image)
desc2 = gpt4_description(image)
# Return ai-generated description
return desc
return desc2
def transcribe(audio):
# Wrapper; ruft eine der drei Whisper-Transcribe-Varianten auf.
# Favorit: das beschleunigte whisper-s2t
# (das aber erst CTranslate2 mit METAL-Unterstützung braucht auf dem Mac
# bzw. CUDA auf Windows-Rechnern)
try:
text = transcribe_whisper(audio)
# return transcribe_jax(audio)
# return transcribe_ws2t(audio)
return text
except:
return ""
def transcribe_whisper(fname, model="large-v3-turbo"):
# Vanilla Whisper. Womöglich nicht die schnellste Lösung.
stt = whisper.load_model(model)
result = stt.transcribe(fname)
return result['text']
def transcribe_jax(audio):
# Nutzt nicht die Standard-Whisper-Bibliothek zum Transkribieren,
# sondern das verbesserte JAX - das beim ersten Durchlauf sehr langsam ist,
# weil es erst etwas herunterladen und übersetzen muss; danach geht's flotter.
# Installieren mit:
# pip install git+https://github.com/sanchit-gandhi/whisper-jax.git
# Auch noch jax importieren?
#
# Projektseite: https://github.com/sanchit-gandhi/whisper-jax
#
# Das hier galt bei Whisper, bei whisper-jax noch prüfen:
# Speichert die Modelle unter ~/.cache/whisper/ ab; da auf meinem Mac schon Whisper-Modelle
# geladen sind, nutze ich den zusätzlichen Parameter
# download_root="{path to the directory to download models}"
from whisper_jax import FlaxWhisperPipline
from typing import NamedType
# instantiate pipeline
pipeline = FlaxWhisperPipline("openai/whisper-large-v3-turbo")
text = pipeline(audio)
return text
import os
import whisper_s2t
def transcribe_ws2t(file_path, model_name="large-v3-turbo", output_format="txt"):
"""
Transcribe an audio/video file using WhisperS2T.
Args:
file_path (str): Path to the .ogg or .mp4 file.
model_name (str): Whisper model to use (e.g., "small", "medium", "large").
output_format (str): Output format ("txt" or "json").
Returns:
str: Transcription text.
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Initialize the WhisperS2T pipeline
model = whisper_s2t.load_model(model_identifier="medium", backend='CTranslate2')
files = [file_path]
lang_codes = ['de']
tasks = ['transcribe']
initial_prompts = [None]
out = model.transcribe_with_vad(files,
lang_codes=lang_codes,
tasks=tasks,
initial_prompts=initial_prompts,
batch_size=24)
return out
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment