diff --git a/.DS_Store b/.DS_Store index d8beb1756d7e8ba14b58837f7e417c386ec707f2..dda6348014c8548b0ea72114bfd5e0995f7b9e8c 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.gitignore b/.gitignore index 05e5cd16964d79fb54ecd53284d436ca107c7c8a..6a0264f63036f2f048a7b3a6625446e89c4de7af 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ # Runtime results bsky-checks/ +media/ +tg-checks/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/main_tg.py b/main_tg.py index 185f8187cc059ceedd6ef09dd2806e7405d05b3a..c89fff058e93beb685f395e22e1d23e58992af0d 100644 --- a/main_tg.py +++ b/main_tg.py @@ -1,23 +1,114 @@ -from src.aichecker.tg_check import * +from src.aichecker.check_tg import * +from src.aichecker.detectora import query_detectora +from src.aichecker.imagecheck import query_aiornot +TEST = False + +def count_posts(posts, threshold): + text_count = 0 + score_count = 0 + + for post in posts: + if 'text' in post: + text_count += 1 if __name__ == "__main__": - # Bluesky-Check - #handle_str = input("Handle des Kanals eingeben: ") - handle_str = "telegram" - channels_dict = tgc_profile(handle_str) - last_post = channels_dict['n_posts'] - print(channels_dict) - # Lies eine Seite (mit bis zu 16 Posts), ohne Mediendateien anzulegen - # und ohne Audios zu transkribieren - posts = tgc_blockread(channels_dict['name'],nr=1, save=False, describe=False) - print(posts) - # Jetzt die aktuellsten Posts, mit Transkription/Mediendateien - #posts = tgc_read(channels_dict['name'],nr=None, save=True, transcribe=True) - #print(posts) - # Nur ein einzelner Post - posts = tgc_read(channels_dict['name'],nr=last_post) - print(posts) - # Über die Post-URL - print(tgc_read_url('https://t.me/telegram/46',save=True, describe=True)) - posts = tgc_read_range(channels_dict['name'], last_post - 19, last_post, save = True, describe= True) - print("Ende") \ No newline at end of file + # tg_check + handle_str = input("Handle des Kanals eingeben: ") + #handle_str = "telegram" + handle = tgc_clean(handle_str) + profile_dict = tgc_profile(handle) + last_post = profile_dict['n_posts'] + if profile_dict is None: + print("Kein Konto mit diesem Namen gefunden.") + exit() + print(f"Analysiert wird: {profile_dict['name']}") + print(f"{profile_dict['description']}") + print() + print(f"Subscriber: {profile_dict['subscribers']}") + print(f"Posts: {profile_dict['n_posts']}") + print(f"Fotos: {profile_dict['photos']}") + print(f"Videos: {profile_dict['videos']}") + print(f"Links: {profile_dict['links']}") + print() + if TEST: + # Lies eine Seite (mit bis zu 16 Posts), ohne Mediendateien anzulegen + # und ohne Audios zu transkribieren + posts = tgc_blockread(profile_dict['name'],nr=1, save=False, describe=False) + # Jetzt die aktuellsten Posts, mit Transkription/Mediendateien + #posts = tgc_read(channels_dict['name'],nr=None, save=True, transcribe=True) + #print(posts) + # Nur ein einzelner Post + posts = tgc_read(profile_dict['name'],nr=last_post) + print(posts) + # Über die Post-URL + print(tgc_read_url('https://t.me/telegram/46',save=True, describe=True)) + # Ein Bereich + posts = tgc_read_range(profile_dict['name'], last_post - 19, last_post, save = True, describe= True) + # Ein einzelner Post mit Video, Vorschaubild und Text + posts = tgc_read_range("telegram", 295, 295, True, True) + post = posts[0] + print("KI-Check:") + if 'detectora_ai_score' not in post: + # Noch keine KI-Einschätzung für den Text? + # post['detectora_ai_score'] = detectora_wrapper(post['text']) + print(f"Detectora-Score: {query_detectora(post['text'])}") + if 'aiornot_ai_score' not in post: + if post['photo'] is not None: + # Bild analysieren + # Das hier ist für die Galerie: AIORNOT kann derzeit + # keine base64-Strings checken. + # Das Problem an den URLs der Photos ist: sie sind nicht garantiert. + base64_image = post['photo'].get('image',None) + image = f"data:image/jpeg;base64, {base64_image}" + #post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url')) + print("AIORNOT-AI-Score: {query_aiornot(post['photo']['url']}") + # Videos kann man nur über das Audio auf KI checken. + # Muss ich erst noch implementieren. + # Die telegram-Videos haben kein Audio; deshalb ist das hier nicht schlimm + print("Ende TEST") + # Schau, ob es schon Daten gibt + if not os.path.exists('tg-checks'): + os.makedirs('tg-checks') + filename = f'tg-checks/{handle}.csv' + if os.path.exists(filename): + existing_df = pd.read_csv(filename) + print(f"Dieser Kanal wurde schon einmal ausgelesen, zuletzt: {max(existing_df[''])}") + # Lies die 20 aktuellsten Posts, sichere und analysiere sie + # + # KONSTANTEN + N = 10 + DETECTORA_T = 0.8 # 80% + AIORNOT_T = 0.9 # 90% + print("Einlesen/mit KI beschreiben: ", end="") + posts = tgc_read_number(handle_str, N) + print() # für die Fortschrittsmeldung + print("Auf KI-Inhalt prüfen: ",end="") + checked_posts = check_tg_list(posts, check_images = True) + # + n_images = 0 + n_ai_images = 0 + n_texts = 0 + n_ai_texts = 0 + for post in checked_posts: + if post['text'] is not None: + n_texts += 1 + # Detectora-Score für diesen Text abrufen; wenn über der Schwelle, + # KI-Texte um eins hochzählen + n_ai_texts += 1 if posts.get('detectora_ai_score',0) > DETECTORA_T else 0 + if post['image'] is not None: + n_images += 1 + try: + # Abruf des Keys kann scheitern, wenn kein Score, deshalb mit Try + ai_score = post['aiornot_ai_score']['ai']['confidence'] + except: + # Kein Key abrufbar? Score 0 + ai_score = 0 + n_ai_images += 1 if ai_score > AIORNOT_T else 0 + print(f"In den {N} Posts: ") + print(f" - Texte: {n_texts}, davon KI-verdächtig: (Schwelle: {n_ai_texts})") + print(f" - Bilder: {n_images}, davon KI-verdächtig: {n_ai_images}") + print(f"Ergebnis wird in 'tg-checks/{handle}.csv' mit abgespeichert. ") + df = pd.DataFrame(posts) + if ('existing_df' in globals()): + df = pd.concat([existing_df, df]).drop_duplicates(subset=['uri']).reset_index(drop=True) + df.to_csv(f'tg-checks/{handle}.csv', index=False) # Save to CSV for example \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 727a4c0c3e890dfe006e0c2181cd720d3f4c3e90..e371801a5c2b1a6d80fa59c41637178db70ee871 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ authors = [ maintainers = [ {name = "Jan Eggers", email = "jan.eggers@hr.de"}, ] -version = "0.2.0.0" # Neue Versionsnummern für pip-Update +version = "0.2.1.0" # Neue Versionsnummern für pip-Update description = "Bluesky- und Telegram-Konten auf KI-Inhalte checken" requires-python = ">=3.8" dependencies = [ diff --git a/src/aichecker/__init__.py b/src/aichecker/__init__.py index bab51f32977b49229f9f453038196f1a8e9b38f5..a89edfd62b64d27ce14087cf69b7ef1a80d2e0cf 100644 --- a/src/aichecker/__init__.py +++ b/src/aichecker/__init__.py @@ -2,4 +2,4 @@ from .check_bsky import * from .transcribe import ai_description from .detectora import query_detectora from .imagecheck import query_aiornot -from .tg_check import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile \ No newline at end of file +from .check_tg import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile \ No newline at end of file diff --git a/src/aichecker/check_bsky.py b/src/aichecker/check_bsky.py index f9a4cca548a7c5a4e4f7aafbfdec6bcda75e7631..ff1a7e3fab9e0950af362fced82ab5a0ebeb63d8 100644 --- a/src/aichecker/check_bsky.py +++ b/src/aichecker/check_bsky.py @@ -5,54 +5,14 @@ import json import pandas as pd -from .detectora import query_detectora -from .imagecheck import query_aiornot from .transcribe import gpt4_description +from .check_wrappers import bsky_aiornot_wrapper, detectora_wrapper import requests import os # Konstante d_thresh = .8 # 80 Prozent limit = 25 # Posts für den Check - -def detectora_wrapper(text: str): - # Verpackung. Fügt nur den "Fortschrittsbalken" hinzu. - print("?", end="") - score = query_detectora(text) - if score is None: - print("\b_",end="") - else: - print(f"\b{'X' if score >= d_thresh else '.'}",end="") - return score - -def aiornot_wrapper(did,embed): - # Verpackung für die AIORNOT-Funktion: - # Checkt, ob es überhaupt ein Embed gibt, - # und ob es ein Bild enthält. - # Wenn ja: geht durch die Bilder und erstellt KI-Beschreibung und KI-Einschätzung - print("?",end="") - if 'images' in embed: - images = embed['images'] - desc = [] - for i in images: - # Construct an URL for the image thumbnail (normalised size) - link = i['image']['ref']['$link'] - i_url = f"https://cdn.bsky.app/img/feed_thumbnail/plain/{did}/{link}" - aiornot_report = query_aiornot(i_url) - # Beschreibung: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978 - gpt4_desc = gpt4_description(i_url) - desc.append({ - 'link_id': link, - 'aiornot_score': aiornot_report['verdict'], - 'aiornot_confidence': aiornot_report['ai']['confidence'], - 'aiornot_generator': aiornot_report['generator'], - 'gpt4_description': gpt4_desc, - }) - print(f"\b{'X' if aiornot_report['verdict'] != 'human' else '.'}",end="") - return desc - else: - print("\b_",end="") - return None def call_get_author_feed(author: str, limit: int=50, cursor= None) -> list: # Sucht den Post-Feed für das Bluesky-Konto author @@ -184,7 +144,7 @@ def check_handle(handle:str, limit:int = 20, cursor = None, check_images = True) # Now add "ai" or "human" assessment for images if check_images: print("\nChecke Bilder:") - df['aiornot_ai_score'] = df.apply(lambda row: aiornot_wrapper(row['author_did'], row['embed']), axis=1) + df['aiornot_ai_score'] = df.apply(lambda row: bsky_aiornot_wrapper(row['author_did'], row['embed']), axis=1) else: df['aiornot_ai_score'] = None print() diff --git a/src/aichecker/tg_check.py b/src/aichecker/check_tg.py similarity index 66% rename from src/aichecker/tg_check.py rename to src/aichecker/check_tg.py index 56ce98099c0d1f37a339256c1f87b7b84cda4d02..76f1eb02978469420e2fa0ca58f9095a4bacc690 100644 --- a/src/aichecker/tg_check.py +++ b/src/aichecker/check_tg.py @@ -14,6 +14,7 @@ import os import re import base64 from .transcribe import gpt4_description, transcribe +from .check_wrappers import detectora_wrapper, aiornot_wrapper def extract_k(n_str: str): try: @@ -31,7 +32,14 @@ def tgc_profile(channel="telegram"): channel (str) Returns: - dict with the keys 'subscribers', 'photos', 'videos', 'links' + dict with the keys + - 'channel' + - 'description' + - 'subscribers', (Number) + - 'photos' (number) + - 'videos' (number) + - 'links' (number) + - 'n_posts' (number of the last published post) Example: profile = tgc_profile("wilhelmkachel") @@ -134,6 +142,11 @@ def tg_post_parse(b, save = True, describe = True): # Text if b.select_one("div.tgme_widget_message_text_wrap") is not None: text = b.select_one("div.tgme_widget_message_text").get_text() + # Polls: Text der Optionen extrahieren + elif b.select_one("div.tgme_widget_message_poll") is not None: + text = b.select_one("div.tgme_widget_message_poll_question").get_text() + for bb in b.select("div.tgme_widget_message_poll_option_text"): + text += "\n* " + bb.get_text() else: text = None # Sticker (Beispiel: https://t.me/telegram/23) @@ -175,14 +188,22 @@ def tg_post_parse(b, save = True, describe = True): else: voice = None # Video URL (Beispiel: https://t.me/telegram/46) + # Wenn ein Thumbnail/Startbild verfügbar ist - das ist leider nur bei den + # Channel-Seiten, nicht bei den Einzel-Post-Seiten der Fall - speichere + # es ab wie ein Photo. if b.select_one('video.tgme_widget_message_video') is not None: video_url = b.select_one('video.tgme_widget_message_video')['src'] if b.select_one('tgme_widget_message_video_thumb') is not None: - video_thumbnail = re.search(r"(?<=image\:url\('\)).+(?=\')",b.select_one('tgme_widget_message_video_thumb')['style'].group(0)) + video_thumbnail_url = re.search(r"(?<=image\:url\('\)).+(?=\')",b.select_one('tgme_widget_message_video_thumb')['style'].group(0)) video = {'url': video_url, - 'thumbnail': video_thumbnail, - 'image': base64.b64encode(requests.get(video_thumbnail).content).decode('utf-8') + 'thumbnail': video_thumbnail_url, } + if save or describe: + # Thumbnail wird unter photo abgespeichert + photo = {'url': video_thumbnail_url, + 'image': base64.b64encode(requests.get(video_thumbnail_url).content).decode('utf-8') + } + photo['file'] = save_url(video_thumbnail_url, f"{channel}_{b_nr}_photo") else: video = {'url': video_url, } @@ -190,8 +211,8 @@ def tg_post_parse(b, save = True, describe = True): video['file'] = save_url(video_url, f"{channel}_{b_nr}_video") if describe: video['transcription'] = transcribe(video['file']) - if 'image' in video: - video['description'] = f"data:image/jpeg;base64,{video['image']}" + if photo is not None: + photo['description'] = gpt4_description(f"data:image/jpeg;base64, {photo['image']}") else: video = None # Document / Audio URL? https://t.me/telegram/35 @@ -208,13 +229,16 @@ def tg_post_parse(b, save = True, describe = True): } else: forward = None - - + # Poll, Beispiel: https://t.me/wilhelmkachel/1079 + poll_type = b.select_one("div.tgme_widget_message_poll_type") + if poll_type is not None: + poll_type = poll_type.get_text() # None wenn nicht vorhanden post_dict = { 'channel': channel, 'nr': b_nr, 'url': post_url, - 'views': views, # Momentaufnahme! + 'views': views, # Momentaufnahme! Views zum Zeitpunkt views_ts + 'views_ts': datetime.now().isoformat(), # Zeitstempel für die Views 'timedate': timestamp, 'text': text, 'photo': photo, @@ -222,12 +246,14 @@ def tg_post_parse(b, save = True, describe = True): 'video': video, 'voice': voice, 'forwards': forward, + 'poll': poll_type, 'links': links, 'hashtags': [f"#{tag}" for tag in hashtags], } return post_dict def tgc_read(cname, nr, save=True, describe = False): + # Einzelnen Post lesen: URL erzeugen, aufrufen. c = tgc_clean(cname) channel_url = f"https://t.me/{c}/{nr}" return tgc_read_url(channel_url) @@ -276,8 +302,10 @@ def tgc_blockread(cname="telegram", nr=None, save=True, describe=False): tgm = BeautifulSoup(response.content, 'html.parser') block = tgm.select("div.tgme_widget_message_wrap") - block_list = [tg_post_parse(b, save, describe) for b in block] - return block_list + posts = [tg_post_parse(b, save, describe) for b in block] + # Posts aufsteigend sortieren + posts.sort(key=lambda x: x['nr']) + return posts def tgc_read_range(cname, n1=1, n2=None, save=True, describe = True): # Liest einen Bereich von Posts @@ -296,5 +324,88 @@ def tgc_read_range(cname, n1=1, n2=None, save=True, describe = True): # Abbruchbedingungen: Letzten Post des Channels erreicht, oder Ende des zu lesenden Bereichs loop = (max_nr == last_nr) or (last_nr > n2) posts.extend(new_posts) - - return posts \ No newline at end of file + # Posts aufsteigend sortieren + posts.sort(key=lambda x: x['nr']) + return posts + +def tgc_read_number(cname, n = 20, cutoff = None, save=True, describe = True): + # Liest eine Anzahl n von Posts, beginnend bei Post cutoff (wenn cutoff=None, dann den aktuellsten) + # Zuerst: Nummer des letzten Posts holen + profile = tgc_profile(cname) + # Sicherheitscheck: erste Post-Nummer überhaupt schon gepostet? + max_nr = profile['n_posts'] + if cutoff is None: + cutoff = max_nr + elif cutoff > max_nr: + return None + posts = [] + while len(posts) < n: + # Blockread-Routine liest immer ein ganzes Stück der Seite + new_posts = tgc_blockread(cname, cutoff, save, describe) + nr_values = [post['nr'] for post in new_posts] + posts.extend(new_posts) + # Abbruchbedingung: erster Post erreicht + if cutoff == 1: + break + cutoff = cutoff - 16 + if cutoff < 1: + cutoff = 1 + # Posts aufsteigend sortieren + posts.sort(key=lambda x: x['nr']) + return posts + +## Routinen zum Check der letzten 20(...) Posts eines Telegram-Channels +# analog zu check_handle in der check_bsky-Library +# +# Hinter den Kulissen werden Listen von Post-dicts genutzt + +# Routine checkt eine Post-Liste, wie sie aus den tgc_read... Routinen kommen. +# Wenn noch kein KI-Check vorliegt, wird er ergänzt. +# Setzt allerdings voraus, dass die entsprechenden Inhalte schon abgespeichert sind. +def check_tg_list(posts, check_images = True): + for post in posts: + if 'detectora_ai_score' not in post: + # Noch keine KI-Einschätzung für den Text? + post['detectora_ai_score'] = detectora_wrapper(post['text']) + # Leerzeile für den Fortschrittsbalken + print() + if not check_images: + return + # Okay, es geht weiter: Bilder auf KI prüfen + for post in posts: + if 'aiornot_ai_score' not in post: + if post['photo'] is not None: + # Bild analysieren + # Das hier ist für die Galerie: AIORNOT kann derzeit + # keine base64-Strings checken. + # Das Problem an den URLs der Photos ist: sie sind nicht garantiert. + base64_image = post['photo'].get('image',None) + image = f"data:image/jpeg;base64, {base64_image}" + post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url')) + return posts +# Wrapper für die check_tg_list Routine. +# Gibt Resultate als df zurück, arbeitet aber hinter den Kulissen mit +# einer Liste von dicts (anders als check_bsky) + +def check_tgc(cname, n=20, cursor = None, check_images = True): + + exit("Funktion noch nicht definiert") + return None + +def retrieve_tg_csv(cname, path= "tg-checks"): + fname = path + "/" + cname + ".csv" + if os.path.exists(fname): + df = pd.read_csv(fname) + # reformat the columns containing dicts + + return df + else: + return None + +def append_tg_csv(cname, posts_list, path = "tg-checks"): + existing_df = retrieve_tg_csv(cname, path) + df = pd.DataFrame(posts_list) + if existing_df is not None: + df = pd.concat([existing_df, df]).drop_duplicates(subset=['uri']).reset_index(drop=True) + df.to_csv(path + "/" + cname + ".csv", index=False) + diff --git a/src/aichecker/check_wrappers.py b/src/aichecker/check_wrappers.py new file mode 100644 index 0000000000000000000000000000000000000000..115b851abbc5bc117182b800ea53ca38e292c733 --- /dev/null +++ b/src/aichecker/check_wrappers.py @@ -0,0 +1,59 @@ +from .detectora import query_detectora +from .imagecheck import query_aiornot +from .transcribe import gpt4_description + +# Konstante +d_thresh = .8 # 80 Prozent +limit = 25 # Posts für den Check + +def detectora_wrapper(text: str): + # Verpackung. Fügt nur den "Fortschrittsbalken" hinzu. + print("?", end="") + if text is None: + print("\b_",end="") + return None + score = query_detectora(text) + if score is None: + print("\b_",end="") + else: + print(f"\b{'X' if score >= d_thresh else '.'}",end="") + return score + +def aiornot_wrapper(image): + # Verpackung. Fortschrittsbalken. + if image is None: + print(" ", end="") + return + # Fortschrittsbalken + print("?", end="") + aiornot_report = query_aiornot(image) + # Beschreibung: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978 + aiornot_dict = ({ + 'link_id': image, + 'aiornot_score': aiornot_report['verdict'], + 'aiornot_confidence': aiornot_report['ai']['confidence'], + 'aiornot_generator': aiornot_report['generator'], + }) + print(f"\b{'X' if aiornot_dict['aiornot_score'] != 'human' else '.'}",end="") + return aiornot_dict + + +def bsky_aiornot_wrapper(did,embed): + # Verpackung für die AIORNOT-Funktion: + # Checkt, ob es überhaupt ein Embed gibt, + # und ob es ein Bild enthält. + # Wenn ja: geht durch die Bilder und erstellt KI-Beschreibung und KI-Einschätzung + if 'images' in embed: + images = embed['images'] + desc = [] + for i in images: + # Construct an URL for the image thumbnail (normalised size) + link = i['image']['ref']['$link'] + i_url = f"https://cdn.bsky.app/img/feed_thumbnail/plain/{did}/{link}" + aiornot_report = aiornot_wrapper(i_url) + aiornot_report['gpt4_description'] = gpt4_description(image) + desc.append(aiornot_report) + return desc + else: + print("\b_",end="") + return None \ No newline at end of file diff --git a/src/aichecker/detectora.py b/src/aichecker/detectora.py index 84afc8d97e687f9f9b4b8ed26743432b05279af9..4c2d2b2c7d455a03de36254c4858806b69a44ac8 100644 --- a/src/aichecker/detectora.py +++ b/src/aichecker/detectora.py @@ -35,6 +35,9 @@ def query_detectora(text): 'query': text, } api_key = os.environ.get('DETECTORA_API_KEY') + if api_key is None or api_key == "": + print("DETECTORA_API_KEY ist nicht gesetzt") + return None headers = { 'APIKey': api_key, 'Content-Type': 'application/json', @@ -51,9 +54,9 @@ def query_detectora(text): print(f"DETECTORA: Fehlerhafte API-Anfrage: \'{data['query']}\'") return None elif response.status_code == 401: - print(f"DETECTORA-API-Key 'api_key' nicht gültig") + print(f"DETECTORA_API_KEY {api_key} nicht gültig") + return None except Exception as e: print("Fehler beim Verbinden mit der DETECTORA-API:", str(e)) return None - return response[''] diff --git a/src/aichecker/imagecheck.py b/src/aichecker/imagecheck.py index 9a00a30066786331bbe1df2c39c522ac7843e6ef..9150cae474f2e7ab0e0b67f3bec5d05bb5d14fba 100644 --- a/src/aichecker/imagecheck.py +++ b/src/aichecker/imagecheck.py @@ -11,7 +11,11 @@ import time endpoint_url = "https://api.aiornot.com/v1/reports/image" def query_aiornot(image): - # Erwartet URI eines Bildes + # Erwartet URI eines Bildes. + # Derzeit kann die AIORNOT-API keine base64-Bilder verarbeiten; d.h.: Eine URI der Form + # "data:image/jpeg;base64, ..." führt zu einem 400-Fehler. + # (Also in diesem Fall: Datei abspeichern und über files= hochladen. ) + # # Wichtigste Rückgabewerte im dict: # - 'verdict' ('human' oder 'ai') # - 'ai'/'confidence' (wie sicher ist sich das Modell?) @@ -30,35 +34,68 @@ def query_aiornot(image): 'Content-Type': 'application/json', 'Accept': 'application/json', } + # Base64-Datei? Temporären File abspeichern und über files= hochladen + if image.startswith("data:image/"): + headers = { + 'Authorization': f"Bearer {api_key}", + 'Accept': 'application/json', + } + fname = save_string_to_temp(image) + try: + response = requests.post(endpoint_url, + headers=headers, + files={'object': open(fname, 'rb')}) + + except Exception as e: + print("Fehler beim Verbinden mit der AIORNOT-API über multipart:", str(e)) + return None try: response = requests.post(endpoint_url, headers=headers, data=data ) - if response.status_code == 200: - # Success - return response.json()['report'] - elif response.status_code == 400: - print("AIORNOT: Fehlerhafte API-Anfrage") - return None - elif response.status_code == 401: - print(f"AIORNOT-API-Key 'api_key' nicht gültig") - return None - elif response.status_code == 429: - # Zu viele Anfragen; also warten und nochmal fragen - time.sleep(1) - response = requests.post(endpoint_url, - headers=headers, - data=data - ) - # Immer noch 429? Dann sind wahrscheinlich die Credits aufgebraucht - if response.status_code == 429: - print("AIORNOT: Credits verbraucht") - return None - else: - return response.json()['report'] except Exception as e: print("Fehler beim Verbinden mit der AIORNOT-API:", str(e)) return None + if response.status_code == 200: + # Success + return response.json()['report'] + elif response.status_code == 400: + print("AIORNOT: Fehlerhafte API-Anfrage") + return None + elif response.status_code == 401: + print(f"AIORNOT-API-Key 'api_key' nicht gültig") + return None + elif response.status_code == 429: + # Zu viele Anfragen; also warten und nochmal fragen + time.sleep(1) + response = requests.post(endpoint_url, + headers=headers, + data=data + ) + # Immer noch 429? Dann sind wahrscheinlich die Credits aufgebraucht + if response.status_code == 429: + print("AIORNOT: Credits verbraucht") + return None + else: + return response.json()['report'] return None +# Hilfsfunktion: base64 als Temp-File speichern +import base64 + +# Example base64 image string: "data:image/jpeg;base64,/9j/4AAQSkZJRgABAQEAYABgAAD..." +def save_string_to_temp(image, fname="./temp"): + header, encoded = image.split(",", 1) + # Leerzeichen entfernen + while encoded[0] == " ": + encoded = encoded[1:] + # Step 2: Decode the base64 string to get the binary image data + image_data = base64.b64decode(encoded) + # Step 3: Write the binary data to a file + # Extract the file extension from the header + file_extension = header.split(";")[0].split("/")[1] + file_name = f"{fname}.{file_extension}" + with open(file_name, "wb") as image_file: + image_file.write(image_data) + return file_name \ No newline at end of file diff --git a/src/aichecker/transcribe.py b/src/aichecker/transcribe.py index 35e1774948b1d417bd43c54ed27a89e55c0c87b9..aaa6d5ddb7ed76ea9e8571e20ed3e5cb57a983eb 100644 --- a/src/aichecker/transcribe.py +++ b/src/aichecker/transcribe.py @@ -23,7 +23,8 @@ OLLAMA = False def gpt4_description(image_url): # Check a local image by converting it to b64: - # image_url = f"data:image/jpeg;base64,{b64_image}" + # image_url = f"data:image/jpeg;base64, {b64_image}" + print(".", end="") response = client.chat.completions.create( model="gpt-4o-mini", messages=[ @@ -80,10 +81,16 @@ def transcribe(audio): def transcribe_whisper(fname, model="large-v3-turbo"): # Vanilla Whisper. Womöglich nicht die schnellste Lösung. + # Installiere einfach mit + # pip install openai-whisper + print(".",end="") stt = whisper.load_model(model) result = stt.transcribe(fname) return result['text'] +# Hier fangen die Alternativ-Transcriber an. Sind auskommentiert, damit man den ganzen Kram +# nicht installieren muss, bevor das Paket losläuft. +""" def transcribe_jax(audio): # Nutzt nicht die Standard-Whisper-Bibliothek zum Transkribieren, # sondern das verbesserte JAX - das beim ersten Durchlauf sehr langsam ist, @@ -98,6 +105,7 @@ def transcribe_jax(audio): # Speichert die Modelle unter ~/.cache/whisper/ ab; da auf meinem Mac schon Whisper-Modelle # geladen sind, nutze ich den zusätzlichen Parameter # download_root="{path to the directory to download models}" + # from whisper_jax import FlaxWhisperPipline from typing import NamedType @@ -108,21 +116,26 @@ def transcribe_jax(audio): return text +# Library importieren mit +# pip install whisper-s2t +# oder +# pip install -U git+https://github.com/shashikg/WhisperS2T.git +# +# Problem mit Whisper: setzt auf dem Mac eine METAL-Einbindng voraus. +# https://github.com/shashikg/WhisperS2T import os import whisper_s2t def transcribe_ws2t(file_path, model_name="large-v3-turbo", output_format="txt"): - """ - Transcribe an audio/video file using WhisperS2T. - - Args: - file_path (str): Path to the .ogg or .mp4 file. - model_name (str): Whisper model to use (e.g., "small", "medium", "large"). - output_format (str): Output format ("txt" or "json"). - - Returns: - str: Transcription text. - """ +# Transcribe an audio/video file using WhisperS2T. +# +# Args: +# file_path (str): Path to the .ogg or .mp4 file. +# model_name (str): Whisper model to use (e.g., "small", "medium", "large"). +# output_format (str): Output format ("txt" or "json"). +# +# Returns: +# str: Transcription text. if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") @@ -141,3 +154,4 @@ def transcribe_ws2t(file_path, model_name="large-v3-turbo", output_format="txt") return out +""" diff --git a/temp.jpeg b/temp.jpeg new file mode 100644 index 0000000000000000000000000000000000000000..e49e98dacae73edf01b5ce7aac898b9b57c049e3 Binary files /dev/null and b/temp.jpeg differ