diff --git a/.DS_Store b/.DS_Store index d077d34ad8ca5a9e1c7d729732838e604e2eda39..d8beb1756d7e8ba14b58837f7e417c386ec707f2 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/main_tg.py b/main_tg.py index ac40ef9bda5ac3be1adf3b593f6a2a19abf25dfa..185f8187cc059ceedd6ef09dd2806e7405d05b3a 100644 --- a/main_tg.py +++ b/main_tg.py @@ -2,6 +2,22 @@ from src.aichecker.tg_check import * if __name__ == "__main__": # Bluesky-Check - handle_str = input("Handle des Kanals eingeben: ") + #handle_str = input("Handle des Kanals eingeben: ") + handle_str = "telegram" channels_dict = tgc_profile(handle_str) - print(channels_dict) \ No newline at end of file + last_post = channels_dict['n_posts'] + print(channels_dict) + # Lies eine Seite (mit bis zu 16 Posts), ohne Mediendateien anzulegen + # und ohne Audios zu transkribieren + posts = tgc_blockread(channels_dict['name'],nr=1, save=False, describe=False) + print(posts) + # Jetzt die aktuellsten Posts, mit Transkription/Mediendateien + #posts = tgc_read(channels_dict['name'],nr=None, save=True, transcribe=True) + #print(posts) + # Nur ein einzelner Post + posts = tgc_read(channels_dict['name'],nr=last_post) + print(posts) + # Über die Post-URL + print(tgc_read_url('https://t.me/telegram/46',save=True, describe=True)) + posts = tgc_read_range(channels_dict['name'], last_post - 19, last_post, save = True, describe= True) + print("Ende") \ No newline at end of file diff --git a/media/.DS_Store b/media/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 Binary files /dev/null and b/media/.DS_Store differ diff --git a/media/telegram_46_video.mp4 b/media/telegram_46_video.mp4 new file mode 100644 index 0000000000000000000000000000000000000000..e0348791bea418311d5c06e596dcc39806437f16 Binary files /dev/null and b/media/telegram_46_video.mp4 differ diff --git a/media/wilhelmkachel_1076_photo.jpg b/media/wilhelmkachel_1076_photo.jpg new file mode 100644 index 0000000000000000000000000000000000000000..df28ea47f1b923d882ad58db16e5c6f7ed4f5d5e Binary files /dev/null and b/media/wilhelmkachel_1076_photo.jpg differ diff --git a/src/aichecker/README-tg.md b/src/aichecker/README-tg.md new file mode 100644 index 0000000000000000000000000000000000000000..fd53596c0b6234e3cf8c4c8417bcf1af5bc06d83 --- /dev/null +++ b/src/aichecker/README-tg.md @@ -0,0 +1,27 @@ +# Telegram-Posts lesen + + +Von Channeln kann man Posts auf zwei Arten lesen: + +- über ihre Kontextseite (t.me/s/<channel>/<id>) +- über ihre individuelle Post-Seite (t.me/s/<channel>/<id>) + +# Kontextseite + + +Die Kontextseite ist etwas bequemer, denn: +- sie lädt direkt komplett (die Post-Seite lädt ein Embed nach) +- sie wird auch angezeigt, wenn es die Post-ID nicht gibt + + +# Postseite + + +Die Postseite lädt die eigentlichen Inhalte als Embed in ein iframe. Es ist möglich, dieses Embed direkt über requests zu laden: +- https://t.me/<channel>/<id>?embed=1&mode=tme + - Parameter embed: alles >0 scheint zu funktionieren + - Parameter mode: akzeptiert auch andere Namen +- Untersuchen, ob es ein Element div.tgme_widget_message_error enthält - dann konnte der Post nicht geladen werden +- Enthält ein Element div.tgme_widget_message_error + +<div class="tgme_widget_message text_not_supported_wrap js-widget_message" data-post="telegram/361" data-view="eyJjIjotMTAwNTY0MDg5MiwicCI6MzYxLCJ0IjoxNzM1OTQxNTY5LCJoIjoiOGJmNWMzZDM1OTE0Y2I1NTMyIn0" data-peer="c1005640892_-6044378432856379164" data-peer-hash="556f33b85ddb50a1e1" data-post-id="361"> \ No newline at end of file diff --git a/src/aichecker/__init__.py b/src/aichecker/__init__.py index f5228614906c6f1a7b13460fa5d2664b8ff7c191..bab51f32977b49229f9f453038196f1a8e9b38f5 100644 --- a/src/aichecker/__init__.py +++ b/src/aichecker/__init__.py @@ -1,5 +1,5 @@ from .check_bsky import * -from .bildbeschreibung import ai_description +from .transcribe import ai_description from .detectora import query_detectora from .imagecheck import query_aiornot -from .tg_check import tgc_clean, tgc_url, tgc_blockread, tgc_collect, tgc_profile \ No newline at end of file +from .tg_check import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile \ No newline at end of file diff --git a/src/aichecker/bildbeschreibung.py b/src/aichecker/bildbeschreibung.py deleted file mode 100644 index 924fb1836fd5e58bec647d2b86be8ba2e55a4b50..0000000000000000000000000000000000000000 --- a/src/aichecker/bildbeschreibung.py +++ /dev/null @@ -1,78 +0,0 @@ -import ollama -from openai import OpenAI -from pathlib import Path -import os -import base64 - -prompt = """Du bist Barrierefreiheits-Assistent. -Du erstellst eine deutsche Bildbeschreibung für den Alt-Text. -Beschreibe, was auf dem Bild zu sehen ist. -Beginne sofort mit der Beschreibung. Sei präzise und knapp. -Du erstellst eine deutsche Bildbeschreibung für den Alt-Text. -Beschreibe, was auf dem Bild zu sehen ist. -Beginne sofort mit der Beschreibung. Sei präzise und knapp. -Wenn das Bild lesbaren Text enthält, zitiere diesen Text.""" -client = OpenAI(api_key = os.environ.get('OPENAI_API_KEY')) -# Use GPT-4 mini to describe images -OLLAMA = False - -def gpt4_description(image_url): - # Check a local image by converting it to b64: - # image_url = f"data:image/jpeg;base64,{b64_image}" - response = client.chat.completions.create( - model="gpt-4o-mini", - messages=[ - { - "role": "user", - "content": [ - {"type": "text", "text": prompt}, - { - "type": "image_url", - "image_url": { - "url": image_url, - } - }, - ], - } - ], - max_tokens=300, - ) - return response.choices[0].message.content - -def llama_description(b64_image): - response = ollama.chat( - model="llama3.2-vision", - messages=[{ - 'role': 'user', - 'content': prompt, - 'images': [b64_image] - }] - ) - return response['message']['content'].strip() - - -def ai_description(fname): - # Use llama3.2-vision to describe images - # Use whisper.cpp command-line tool to transcribe audio and video - desc = f"Filetype: {fname.lower()[-4:]}" - image_folder = os.path.join(os.path.dirname(__file__), 'messages') - file_path = os.path.join(image_folder, fname) - file_path = os.path.join(image_folder, fname) - if fname.lower().endswith(('.jpg', '.jpeg')): - try: - with open(file_path, 'rb') as file: - file_content = file.read() - image = base64.b64encode(file_content).decode('utf-8') - except FileNotFoundError: - return "!!!Datei nicht gefunden!!!" - except Exception as e: - raise Exception(f"Error reading file {fname}: {str(e)}") - if OLLAMA: - desc2 = llama_description(image) - else: - desc2 = gpt4_description(image) - desc2 = gpt4_description(image) - desc = f"{desc}\n{desc2}" - - # Return ai-generated description - return desc diff --git a/src/aichecker/check_bsky.py b/src/aichecker/check_bsky.py index 878c309263d6a2acd99ce46ab8974de9a539aa6f..f9a4cca548a7c5a4e4f7aafbfdec6bcda75e7631 100644 --- a/src/aichecker/check_bsky.py +++ b/src/aichecker/check_bsky.py @@ -7,7 +7,7 @@ import json import pandas as pd from .detectora import query_detectora from .imagecheck import query_aiornot -from .bildbeschreibung import gpt4_description +from .transcribe import gpt4_description import requests import os diff --git a/src/aichecker/imagecheck.py b/src/aichecker/imagecheck.py index c16cad412831dc16b931c1c1930e91c591fd4230..9a00a30066786331bbe1df2c39c522ac7843e6ef 100644 --- a/src/aichecker/imagecheck.py +++ b/src/aichecker/imagecheck.py @@ -1,6 +1,6 @@ # imagecheck.py # Erfragt KI-Wahrscheinlichkeit für ein Bild über Hive- und AIorNot-API -from .bildbeschreibung import ai_description +from .transcribe import ai_description import requests import json diff --git a/src/aichecker/tg_check.py b/src/aichecker/tg_check.py index 4c2c54d548e6399d284b29d38f33801c285b394c..56ce98099c0d1f37a339256c1f87b7b84cda4d02 100644 --- a/src/aichecker/tg_check.py +++ b/src/aichecker/tg_check.py @@ -1,9 +1,8 @@ # tg_check.py # -# Mistral-Übersetzung aus R (mein altes Rtgchannels-Projekt V0.11) +# Mistral-Übersetzung aus R (mein altes Rtgchannels-Projekt V0.1.1) # Angepasst auf Listen statt Dataframes # -# Noch nicht alles getestet und umgeschrieben # 1-2025 Jan Eggers @@ -13,22 +12,18 @@ from bs4 import BeautifulSoup from datetime import datetime import os import re +import base64 +from .transcribe import gpt4_description, transcribe def extract_k(n_str: str): - if n_str.endswith('K'): - try: - # Zahlen wie '5.06K', '1K' - n = int(float(n_str[:-1]) * 1000) - except: - return None - else: - try: - n = int(n_str) - except: - return None - return n + try: + # Zahlen wie '5.06K', '1K', '1.2M' + n_f = float(re.sub(r'[KMB]$', lambda m: {'K': 'e+03', 'M': 'e+06', 'B': 'e+09'}[m.group()], n_str)) + return int(n_f) + except: + return None -def tgc_profile(channel="ffmfreiheit"): +def tgc_profile(channel="telegram"): """ Generates base statistics for a Telegram channel. @@ -51,203 +46,255 @@ def tgc_profile(channel="ffmfreiheit"): except requests.exceptions.RequestException: print(f"Warning: Channel {c} not found") return None - channel_info = {} + if tgm.select_one("div.tgme_channel_info_description") is not None: + description = tgm.select_one("div.tgme_channel_info_description").get_text() + else: + description = None + channel_info = {'name': c, + 'description': description} for info_counter in tgm.find_all('div', class_='tgme_channel_info_counter'): counter_value = info_counter.find('span', class_='counter_value').text.strip() counter_type = info_counter.find('span', class_='counter_type').text.strip() channel_info[counter_type] = extract_k(counter_value) - - return channel_info - -""" - # Read values from the info card - counter_type = [span.get_text() for span in tgm.select('div.tgme_channel_info_counter span.counter_type')] - counter_values = [extract_k(re.sub(r'[KMB]$', lambda m: {'K': 'e+03', 'M': 'e+06', 'B': 'e+09'}[m.group()], span.get_text())) - for span in tgm.select('div.tgme_channel_info_counter span.counter_value')] - - df = pd.DataFrame({'name': counter_type, 'values': counter_values}).pivot(index=None, columns='name', values='values').reset_index(drop=True) - - # Add id, description, title - df['id'] = c - df['title'] = tgm.select_one('div.tgme_channel_info_header_title').get_text() - df['description'] = tgm.select_one('div.tgme_channel_info_description').get_text() - # The last post is visible on this page. Gather its number. - last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href'] - df['last_post_n'] = int(re.search(r'[0-9]+$', last_post_href).group()) + # The last post is visible on this page. Gather its number and date. + last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href'] + channel_info['n_posts'] = int(re.search(r'[0-9]+$', last_post_href).group()) - df['last_post_datetime'] = pd.to_datetime(tgm.select('time.time')[-1]['datetime']) - - # Now get the first post. - tgm_firstpost = BeautifulSoup(requests.get(f"{c_url}/1").content, 'html.parser') - df['created'] = pd.to_datetime(tgm_firstpost.select_one('time')['datetime']) - - # Calculate posts per week - df['post_per_week'] = df['last_post_n'] / ((datetime.now() - df['created']).days / 7) - - if channels_df is None: - channels_df = df - else: - channels_df = pd.concat([channels_df, df], ignore_index=True) + return channel_info - return channels_df -""" def tgc_clean(cname): """ Helper function returning a sanitized Telegram channel name in lowercase. Parameters: - cname (str or list): Telegram channel name or URL. + cname (str): Telegram channel name or URL. Returns: - str or list: Lower-case of the extracted channel name. + str: Lower-case of the extracted channel name. """ # Convert to lower case - cname = [name.lower() for name in cname] if isinstance(cname, list) else cname.lower() - + name = cname.lower() + # Define the regex patterns tme_pattern = re.compile(r"t\.me/s/") extract_pattern = re.compile(r"(?<=t\.me/)[a-zäöüß0-9_]+") sanitize_pattern = re.compile(r"[a-zäöüß0-9_]+") + if tme_pattern.search(name): + n = extract_pattern.search(name).group(0) + else: + n = sanitize_pattern.search(name).group(0) + + return n - def process_name(name): - if tme_pattern.search(name): - return extract_pattern.search(name).group(0) +def save_url(fname, name, mdir="./media"): + # Die Medien-URLs bekommen oft einen Parameter mit übergeben; deswegen nicht nur + # "irgendwas.ogg" berücksichtigen, sondern auch "irgendwas.mp4?nochirgendwas" + content_ext = re.search("\.[a-zA-Z0-9]+(?=\?|$)",fname).group(0) + content_file = f"{mdir}/{name}{content_ext}" + try: + os.makedirs(os.path.dirname(content_file), exist_ok=True) + except: + print(f"Kann kein Media-Directory in {mdir} öffnen") + return None + try: + with open(content_file, 'wb') as f: + f.write(requests.get(fname).content) + return content_file + except: + print(f"Kann Datei {content_file} nicht schreiben") + return None + +def get_channel_from_url(channel:str): + return re.search(r"(?<=t\.me\/).+(?=\/[0-9])",channel).group(0) + +def tg_post_parse(b, save = True, describe = True): + # Immer vorhanden: + # Postnummer, Zeitstempel (auch wenn er in Einzel-Posts als datetime auftaucht und in Channel_seiten als time) + b_nr = int(re.search(r'[0-9]+$', b.select_one("a.tgme_widget_message_date")['href']).group()) + if b.select_one("time.time") is not None: + timestamp = datetime.fromisoformat(b.select_one("time.time")['datetime']) + else: # Einzel-Post + timestamp = datetime.fromisoformat(b.select_one("time.datetime")['datetime']) + # + if b.select_one("span.tgme_widget_message_views") is not None: + views = extract_k(b.select_one("span.tgme_widget_message_views").get_text()) + else: + views = None + if b.select_one("a.tgme_widget_message_date"): + post_url = b.select_one("a.tgme_widget_message_date")['href'] + channel = get_channel_from_url(post_url) + else: + post_url = None + textlinks = b.select("div.tgme_widget_message_text a") + links = [a['href'] for a in textlinks if a['href'].startswith("http")] + hashtags = [a['href'][3:] for a in textlinks if a['href'].startswith("?q=")] + ### Die möglichen Content-Abschnitte eines Posts ### + # Text + if b.select_one("div.tgme_widget_message_text_wrap") is not None: + text = b.select_one("div.tgme_widget_message_text").get_text() + else: + text = None + # Sticker (Beispiel: https://t.me/telegram/23) + if b.select_one("div.tgme_widget_message_sticker_wrap") is not None: + sticker_url = b.select_one("i.tgme_widget_message_sticker")['data-webp'] + sticker = {'url': sticker_url, + 'image': base64.b64encode(requests.get(sticker_url).content).decode('utf-8') + } + if describe: + # GPT4o-mini versteht JPG, PNG, nicht animiertes GIF... und WEBP. + sticker['description'] = gpt4_description(sticker_url) + if save: + sticker['file'] = save_url(sticker_url, f"{channel}_{b_nr}_sticker") + else: + sticker = None + # Photo URL + if b.select_one("a.tgme_widget_message_photo_wrap") is not None: + photo_url = re.search(r"(?<=image\:url\(\').+(?=\')", b.select_one("a.tgme_widget_message_photo_wrap")['style']).group(0) + photo = {'url': photo_url, + 'image': base64.b64encode(requests.get(photo_url).content).decode('utf-8') + } + if describe: + photo['description'] = gpt4_description(f"data:image/jpeg;base64,{photo['image']}") + if save: + photo['file'] = save_url(photo_url, f"{channel}_{b_nr}_photo") + else: + photo = None + # Sprachnachricht tgme_widget_message_voice https://t.me/fragunsdochDasOriginal/27176 + if b.select_one('audio.tgme_widget_message_voice') is not None: + # Link auf OGG-Datei + voice_url = b.select_one('audio.tgme_widget_message_voice')[url] + voice_duration = b.select_one('time.tgme_widget_message_voice_duration').get_text() + # Für Transkription immer lokale Kopie anlegen + if save or describe: + voice['file'] = save_url(voice_url, f"{channel}_{b_nr}_voice") + if describe: + voice['transcription'] = transcribe(voice['file']) + + else: + voice = None + # Video URL (Beispiel: https://t.me/telegram/46) + if b.select_one('video.tgme_widget_message_video') is not None: + video_url = b.select_one('video.tgme_widget_message_video')['src'] + if b.select_one('tgme_widget_message_video_thumb') is not None: + video_thumbnail = re.search(r"(?<=image\:url\('\)).+(?=\')",b.select_one('tgme_widget_message_video_thumb')['style'].group(0)) + video = {'url': video_url, + 'thumbnail': video_thumbnail, + 'image': base64.b64encode(requests.get(video_thumbnail).content).decode('utf-8') + } else: - return sanitize_pattern.search(name).group(0) - - if isinstance(cname, list): - return [process_name(name) for name in cname] + video = {'url': video_url, + } + if save or describe: + video['file'] = save_url(video_url, f"{channel}_{b_nr}_video") + if describe: + video['transcription'] = transcribe(video['file']) + if 'image' in video: + video['description'] = f"data:image/jpeg;base64,{video['image']}" else: - return process_name(cname) - - -#################### HIER SEYEN DRACHEN ##################### -# All the untested functions follow here - they are just Mistral -# translations/rewrites of the R stuff. - -def tgc_url(cname, nr): - """ - Helper function returning a Telegram channel post URL. - - Parameters: - cname (str): Telegram channel name or URL. - nr (int): Post number. - - Returns: - str: URL. - """ - cname = cname.lower() - match = re.search(r"[a-zäöüß0-9_]+", cname) - if match: - return f"https://t.me/s/{match.group(0)}/{nr}" - return None - - - + video = None + # Document / Audio URL? https://t.me/telegram/35 + # Link-Preview: https://t.me/s/telegram/15 + -# Example usage -# test_list = tgc_blockread("telegram", nr=1) -# test_list = tgc_blockread("telegram") + # Forwarded + if b.select_one("a.tgme_widget_message_forwarded_from_name") is not None: + forward_url = b.select_one("a.tgme_widget_message_forwarded_from_name")['href'] + forward_name = channel + forward = { + 'url': forward_url, + 'name': forward_name, + } + else: + forward = None + + + post_dict = { + 'channel': channel, + 'nr': b_nr, + 'url': post_url, + 'views': views, # Momentaufnahme! + 'timedate': timestamp, + 'text': text, + 'photo': photo, + 'sticker': sticker, + 'video': video, + 'voice': voice, + 'forwards': forward, + 'links': links, + 'hashtags': [f"#{tag}" for tag in hashtags], + } + return post_dict + +def tgc_read(cname, nr, save=True, describe = False): + c = tgc_clean(cname) + channel_url = f"https://t.me/{c}/{nr}" + return tgc_read_url(channel_url) + +def tgc_read_url(channel_url, save=True, describe = False): + # Reads a single post from its URL + # Supposes that the URL is well-formed. + channel_url += "?embed=1&mode=tme" + response = requests.get(channel_url) + response.raise_for_status() + tgm = BeautifulSoup(response.content, 'html.parser') + # Error message? + if tgm.select_one("div.tgme_widget_message_error") is not None: + print(f"Fehler beim Lesen von {channel_url}") + return None + b = tgm.select_one("div.tgme_widget_message") + return tg_post_parse(b, save, describe) -def tgc_blockread(cname="telegram", nr=None, save=True): +def tgc_blockread(cname="telegram", nr=None, save=True, describe=False): """ Reads a block of posts from the channel - normally 16 are displayed. + If single parameter is set, read only the post nr; return empty if it + does not exist. Parameters: cname (str): Channel name as a string (non-name characters are stripped). nr (int, optional): Number where the block is centered. If none is given, read last post. save (bool, default True): Saves images to an image folder. + describe (bool, default True): Transcribes/describes media content + single (bool, default False): Return a single post rather than up to 16 Returns: - list of dict: A list of dictionaries consisting of up to 16 rows for each post. + list of dict: A list of dictionaries consisting of up to 16 posts. """ if nr is None: - nr = "" + nr = "" # Without a number, the most recent page/post is shown else: nr = int(nr) - cname = tgc_clean(cname) - tgc_url_ = tgc_url(cname, nr) - - response = requests.get(tgc_url_) + c = tgc_clean(cname) + # Nur einen Post holen? Dann t.me/<channel>/<nr>, + # sonst t.me/s/<channel>/<nr> + channel_url = f"https://t.me/s/{c}/{nr}" + response = requests.get(channel_url) response.raise_for_status() tgm = BeautifulSoup(response.content, 'html.parser') - block = tgm.select("div.tgme_widget_message_wrap") - block_list = [] - - for b in block: - b_nr = int(re.search(r'[0-9]+$', b.select_one("a.tgme_widget_message_date")['href']).group()) - forward = b.select_one("a.tgme_widget_message_forwarded_from_name") - forward_url = forward['href'] if forward else None - - textlinks = b.select("div.tgme_widget_message_text a") - links = [a['href'] for a in textlinks if a['href'].startswith("http")] - hashtags = [a['href'][3:] for a in textlinks if a['href'].startswith("?q=")] - - photo_url_match = re.search(r"(?<=image\:url\('\)).+(?=\')", b.select_one("a.tgme_widget_message_photo_wrap")['style']) - photo_url = photo_url_match.group(0) if photo_url_match else None - - post_dict = { - 'name': cname, - 'nr': b_nr, - 'url': b.select_one("a.tgme_widget_message_date")['href'], - 'timedate': pd.to_datetime(b.select_one("time.time")['datetime']), - 'text': b.select_one("div.tgme_widget_message_text").get_text(), - 'views': int(re.sub(r'[KMB]$', lambda m: {'K': 'e+03', 'M': 'e+06', 'B': 'e+09'}[m.group()], b.select_one("span.tgme_widget_message_views").get_text())), - 'forwards': forward_url, - 'links': links, - 'hashtags': [f"#{tag}" for tag in hashtags], - 'photo': photo_url - } - - if save and photo_url: - photo_file_search_string = r'\.[a-zA-Z]+$' - photo_file = f"./media/{cname}_post_{b_nr}{re.search(photo_file_search_string, photo_url).group(0)}" - os.makedirs(os.path.dirname(photo_file), exist_ok=True) - with open(photo_file, 'wb') as f: - f.write(requests.get(photo_url).content) - - block_list.append(post_dict) - + block = tgm.select("div.tgme_widget_message_wrap") + block_list = [tg_post_parse(b, save, describe) for b in block] return block_list -# Examples: -# test_list = tgc_collect("telegram") -# test_list = tgc_collect("telegram", first=1) -# test_list = tgc_collect("telegram", -100) - -def tgc_collect(cname, first=1, save=False): - """ - Collect hashtags, keywords, and links from a Telegram channel. - - Parameters: - cname (str): Channel name to crawl. - first (int): Earliest number of blocks to read (0 = all, negative reads number of posts). - save (bool, default False): Saves images to an image folder. - - Returns: - list of dict: A list of dictionaries containing the posts in ascending order. - """ - collect_list = tgc_blockread(cname, save=save) - min_nr = min(post['nr'] for post in collect_list) - max_nr = max(post['nr'] for post in collect_list) - - if first < 1: - first = max_nr + first + 1 - if first == 0: - first = 1 - - while first < min_nr: - block_list = tgc_blockread(cname, min_nr - 8, save=save) - block_list = [post for post in block_list if post['nr'] < min_nr] - collect_list = block_list + collect_list - min_nr = min(post['nr'] for post in block_list) - print(".", end="") - - print(f"\nRead {len(collect_list)} posts\n") - return [post for post in collect_list if post['nr'] >= first] - - +def tgc_read_range(cname, n1=1, n2=None, save=True, describe = True): + # Liest einen Bereich von Posts + # Zuerst: Nummer des letzten Posts holen + profile = tgc_profile(cname) + # Sicherheitscheck: erste Post-Nummer überhaupt schon gepostet? + max_nr = profile['n_post'] + if n1 > max_nr: + return None + loop = True + posts = [] + while loop: + new_posts = tgc_blockread(cname, n1, save, describe) + nr_values = [post['nr'] for post in new_posts] + last_nr = max(nr_values) + # Abbruchbedingungen: Letzten Post des Channels erreicht, oder Ende des zu lesenden Bereichs + loop = (max_nr == last_nr) or (last_nr > n2) + posts.extend(new_posts) + + return posts \ No newline at end of file diff --git a/src/aichecker/transcribe.py b/src/aichecker/transcribe.py new file mode 100644 index 0000000000000000000000000000000000000000..35e1774948b1d417bd43c54ed27a89e55c0c87b9 --- /dev/null +++ b/src/aichecker/transcribe.py @@ -0,0 +1,143 @@ +# transcribe.py +# +# Bilder mit GPT4o-mini in Bildbeschreibung umwandeln, +# Audios/Videos lokal mit whisper transkribieren + +import ollama +from openai import OpenAI +from pathlib import Path +import os +import whisper + +prompt = """Du bist Barrierefreiheits-Assistent. +Du erstellst eine deutsche Bildbeschreibung für den Alt-Text. +Beschreibe, was auf dem Bild zu sehen ist. +Beginne sofort mit der Beschreibung. Sei präzise und knapp. +Du erstellst eine deutsche Bildbeschreibung für den Alt-Text. +Beschreibe, was auf dem Bild zu sehen ist. +Beginne sofort mit der Beschreibung. Sei präzise und knapp. +Wenn das Bild lesbaren Text enthält, zitiere diesen Text.""" +client = OpenAI(api_key = os.environ.get('OPENAI_API_KEY')) +# Use GPT-4 mini to describe images +OLLAMA = False + +def gpt4_description(image_url): + # Check a local image by converting it to b64: + # image_url = f"data:image/jpeg;base64,{b64_image}" + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": image_url, + } + }, + ], + } + ], + max_tokens=300, + ) + return response.choices[0].message.content + +def llama_description(b64_image): + response = ollama.chat( + model="llama3.2-vision", + messages=[{ + 'role': 'user', + 'content': prompt, + 'images': [b64_image] + }] + ) + return response['message']['content'].strip() + + +def ai_description(image): + if OLLAMA: + desc2 = llama_description(image) + else: + desc2 = gpt4_description(image) + desc2 = gpt4_description(image) + # Return ai-generated description + return desc2 + +def transcribe(audio): + # Wrapper; ruft eine der drei Whisper-Transcribe-Varianten auf. + # Favorit: das beschleunigte whisper-s2t + # (das aber erst CTranslate2 mit METAL-Unterstützung braucht auf dem Mac + # bzw. CUDA auf Windows-Rechnern) + try: + text = transcribe_whisper(audio) + # return transcribe_jax(audio) + # return transcribe_ws2t(audio) + return text + except: + return "" + +def transcribe_whisper(fname, model="large-v3-turbo"): + # Vanilla Whisper. Womöglich nicht die schnellste Lösung. + stt = whisper.load_model(model) + result = stt.transcribe(fname) + return result['text'] + +def transcribe_jax(audio): + # Nutzt nicht die Standard-Whisper-Bibliothek zum Transkribieren, + # sondern das verbesserte JAX - das beim ersten Durchlauf sehr langsam ist, + # weil es erst etwas herunterladen und übersetzen muss; danach geht's flotter. + # Installieren mit: + # pip install git+https://github.com/sanchit-gandhi/whisper-jax.git + # Auch noch jax importieren? + # + # Projektseite: https://github.com/sanchit-gandhi/whisper-jax + # + # Das hier galt bei Whisper, bei whisper-jax noch prüfen: + # Speichert die Modelle unter ~/.cache/whisper/ ab; da auf meinem Mac schon Whisper-Modelle + # geladen sind, nutze ich den zusätzlichen Parameter + # download_root="{path to the directory to download models}" + from whisper_jax import FlaxWhisperPipline + from typing import NamedType + + # instantiate pipeline + pipeline = FlaxWhisperPipline("openai/whisper-large-v3-turbo") + + text = pipeline(audio) + + return text + +import os +import whisper_s2t + +def transcribe_ws2t(file_path, model_name="large-v3-turbo", output_format="txt"): + """ + Transcribe an audio/video file using WhisperS2T. + + Args: + file_path (str): Path to the .ogg or .mp4 file. + model_name (str): Whisper model to use (e.g., "small", "medium", "large"). + output_format (str): Output format ("txt" or "json"). + + Returns: + str: Transcription text. + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + # Initialize the WhisperS2T pipeline + model = whisper_s2t.load_model(model_identifier="medium", backend='CTranslate2') + files = [file_path] + lang_codes = ['de'] + tasks = ['transcribe'] + initial_prompts = [None] + + out = model.transcribe_with_vad(files, + lang_codes=lang_codes, + tasks=tasks, + initial_prompts=initial_prompts, + batch_size=24) + + return out +