diff --git a/.DS_Store b/.DS_Store index dda6348014c8548b0ea72114bfd5e0995f7b9e8c..00c4a4f7c54eca4296ad66a0d604a1bd4758d156 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/main_tg.py b/main_tg.py index c89fff058e93beb685f395e22e1d23e58992af0d..c953dd9c4e77aae2d93a2a17b3ef80ba1ea75192 100644 --- a/main_tg.py +++ b/main_tg.py @@ -1,51 +1,55 @@ from src.aichecker.check_tg import * from src.aichecker.detectora import query_detectora -from src.aichecker.imagecheck import query_aiornot +from src.aichecker.aiornot import query_aiornot TEST = False -def count_posts(posts, threshold): - text_count = 0 - score_count = 0 - - for post in posts: - if 'text' in post: - text_count += 1 +# Hilfsfunktion: CSV einlesen und als df ausgeben +def reimport_csv(fname): + df = pd.read_csv(fname) + # Diese Spalten sind dict: + structured_columns = ['photo', 'sticker', 'video', 'voice', 'forward', 'links'] + for c in structured_columns: + df[c] = df[c].apply(convert_to_obj) + # AIORNOT-Bewertung sind dict + df['aiornot_ai_score'] = df['aiornot_ai_score'].apply(convert_to_obj) + return df + if __name__ == "__main__": # tg_check handle_str = input("Handle des Kanals eingeben: ") #handle_str = "telegram" handle = tgc_clean(handle_str) - profile_dict = tgc_profile(handle) - last_post = profile_dict['n_posts'] - if profile_dict is None: + profile = tgc_profile(handle) + if profile is None: print("Kein Konto mit diesem Namen gefunden.") exit() - print(f"Analysiert wird: {profile_dict['name']}") - print(f"{profile_dict['description']}") + last_post = profile['n_posts'] + print(f"Analysiert wird: {profile['name']}") + print(f"{profile['description']}") print() - print(f"Subscriber: {profile_dict['subscribers']}") - print(f"Posts: {profile_dict['n_posts']}") - print(f"Fotos: {profile_dict['photos']}") - print(f"Videos: {profile_dict['videos']}") - print(f"Links: {profile_dict['links']}") + print(f"Subscriber: {profile['subscribers']}") + print(f"Posts: {profile['n_posts']}") + print(f"Fotos: {profile['photos']}") + print(f"Videos: {profile['videos']}") + print(f"Links: {profile['links']}") print() if TEST: # Lies eine Seite (mit bis zu 16 Posts), ohne Mediendateien anzulegen # und ohne Audios zu transkribieren - posts = tgc_blockread(profile_dict['name'],nr=1, save=False, describe=False) + posts = tgc_blockread(profile['name'],nr=1, save=False, describe=False) # Jetzt die aktuellsten Posts, mit Transkription/Mediendateien #posts = tgc_read(channels_dict['name'],nr=None, save=True, transcribe=True) #print(posts) # Nur ein einzelner Post - posts = tgc_read(profile_dict['name'],nr=last_post) + posts = tgc_read(profile['name'],nr=last_post) print(posts) # Über die Post-URL print(tgc_read_url('https://t.me/telegram/46',save=True, describe=True)) # Ein Bereich - posts = tgc_read_range(profile_dict['name'], last_post - 19, last_post, save = True, describe= True) + posts = tgc_read_range(profile['name'], last_post - 19, last_post, save = True, describe= True) # Ein einzelner Post mit Video, Vorschaubild und Text - posts = tgc_read_range("telegram", 295, 295, True, True) + posts = tgc_read_range("fragunsdochDasOriginal", 27170, 27170, True, True) post = posts[0] print("KI-Check:") if 'detectora_ai_score' not in post: @@ -53,15 +57,18 @@ if __name__ == "__main__": # post['detectora_ai_score'] = detectora_wrapper(post['text']) print(f"Detectora-Score: {query_detectora(post['text'])}") if 'aiornot_ai_score' not in post: - if post['photo'] is not None: + if post['video'] is not None: + # Audio des Videos analysieren + post['aiornot_ai_score'] = aiornot_wrapper(post['video'].get('url'), is_image = False) + print("Video: AIORNOT-Score") # Bild analysieren # Das hier ist für die Galerie: AIORNOT kann derzeit # keine base64-Strings checken. # Das Problem an den URLs der Photos ist: sie sind nicht garantiert. base64_image = post['photo'].get('image',None) image = f"data:image/jpeg;base64, {base64_image}" - #post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url')) - print("AIORNOT-AI-Score: {query_aiornot(post['photo']['url']}") + post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url')) + print("AIORNOT-AI-Score: {post['aiornot_ai_score']}") # Videos kann man nur über das Audio auf KI checken. # Muss ich erst noch implementieren. # Die telegram-Videos haben kein Audio; deshalb ist das hier nicht schlimm @@ -71,8 +78,9 @@ if __name__ == "__main__": os.makedirs('tg-checks') filename = f'tg-checks/{handle}.csv' if os.path.exists(filename): - existing_df = pd.read_csv(filename) - print(f"Dieser Kanal wurde schon einmal ausgelesen, zuletzt: {max(existing_df[''])}") + existing_df = reimport_csv(filename) + max_nr = max(existing_df['nr']) + print(f"Dieser Kanal wurde schon einmal ausgelesen, zuletzt Post Nr.: {max_nr} - seitdem {last_post-max_nr} neue Posts") # Lies die 20 aktuellsten Posts, sichere und analysiere sie # # KONSTANTEN @@ -89,26 +97,29 @@ if __name__ == "__main__": n_ai_images = 0 n_texts = 0 n_ai_texts = 0 + n_videos = 0 + n_ai_videos = 0 for post in checked_posts: - if post['text'] is not None: - n_texts += 1 - # Detectora-Score für diesen Text abrufen; wenn über der Schwelle, - # KI-Texte um eins hochzählen - n_ai_texts += 1 if posts.get('detectora_ai_score',0) > DETECTORA_T else 0 - if post['image'] is not None: - n_images += 1 - try: - # Abruf des Keys kann scheitern, wenn kein Score, deshalb mit Try - ai_score = post['aiornot_ai_score']['ai']['confidence'] - except: - # Kein Key abrufbar? Score 0 - ai_score = 0 - n_ai_images += 1 if ai_score > AIORNOT_T else 0 + if post['text'] is not None: + n_texts += 1 + # Detectora-Score für diesen Text abrufen; wenn über der Schwelle, + # KI-Texte um eins hochzählen + n_ai_texts += 1 if post.get('detectora_ai_score',0) > DETECTORA_T else 0 + if post['photo'] is not None: + n_images += 1 + ai_score = post['aiornot_ai_score'].get('confidence',0) + n_ai_images += 1 if ai_score > AIORNOT_T else 0 + if post['video'] is not None: + n_videos += 1 + ai_score = post['aiornot_ai_score'].get('confidence', 0) + n_ai_videos += 1 if ai_score > AIORNOT_T else 0 + print(f"In den {N} Posts: ") - print(f" - Texte: {n_texts}, davon KI-verdächtig: (Schwelle: {n_ai_texts})") - print(f" - Bilder: {n_images}, davon KI-verdächtig: {n_ai_images}") + print(f" - Texte: {n_texts}, davon KI-verdächtig: {n_ai_texts} (Schwelle: {DETECTORA_T})") + print(f" - Bilder: {n_images}, davon KI-verdächtig: {n_ai_images} (Schwelle: {AIORNOT_T})") print(f"Ergebnis wird in 'tg-checks/{handle}.csv' mit abgespeichert. ") df = pd.DataFrame(posts) if ('existing_df' in globals()): df = pd.concat([existing_df, df]).drop_duplicates(subset=['uri']).reset_index(drop=True) - df.to_csv(f'tg-checks/{handle}.csv', index=False) # Save to CSV for example \ No newline at end of file + df.to_csv(f'tg-checks/{handle}.csv', index=False) # Save to CSV for example + diff --git a/src/aichecker/__init__.py b/src/aichecker/__init__.py index a89edfd62b64d27ce14087cf69b7ef1a80d2e0cf..0a54847067cced94cec1d7abd3027159348230fd 100644 --- a/src/aichecker/__init__.py +++ b/src/aichecker/__init__.py @@ -1,5 +1,5 @@ from .check_bsky import * from .transcribe import ai_description from .detectora import query_detectora -from .imagecheck import query_aiornot +from .aiornot import query_aiornot from .check_tg import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile \ No newline at end of file diff --git a/src/aichecker/imagecheck.py b/src/aichecker/aiornot.py similarity index 66% rename from src/aichecker/imagecheck.py rename to src/aichecker/aiornot.py index 9150cae474f2e7ab0e0b67f3bec5d05bb5d14fba..239a008e502d01d4d112bd48d728d4ad3aa4bed7 100644 --- a/src/aichecker/imagecheck.py +++ b/src/aichecker/aiornot.py @@ -1,5 +1,8 @@ -# imagecheck.py +# aiornot.py # Erfragt KI-Wahrscheinlichkeit für ein Bild über Hive- und AIorNot-API +# +# Inzwischen entdeckt: brauchen wir eigentlich nicht. +# https://github.com/aiornotinc/aiornot-python from .transcribe import ai_description import requests @@ -8,25 +11,34 @@ import os import time # Konstanten # -endpoint_url = "https://api.aiornot.com/v1/reports/image" +image_endpoint_url = "https://api.aiornot.com/v1/reports/image" +audio_endpoint_url = "https://api.aiornot.com/v1/reports/audio" -def query_aiornot(image): - # Erwartet URI eines Bildes. +def query_aiornot(content, is_image = False): + # Erwartet URI eines Bildes (Bildcheck) + # + # Der Detektor kann die Typen image/apng, image/gif, image/jpeg, image/png, image/svg+xml, image/webp verarbeiten. + # # Derzeit kann die AIORNOT-API keine base64-Bilder verarbeiten; d.h.: Eine URI der Form # "data:image/jpeg;base64, ..." führt zu einem 400-Fehler. # (Also in diesem Fall: Datei abspeichern und über files= hochladen. ) # # Wichtigste Rückgabewerte im dict: # - 'verdict' ('human' oder 'ai') - # - 'ai'/'confidence' (wie sicher ist sich das Modell?) - # - 'generator' ist ein dict, das für die vier geprüften Modelle + # - 'ai'/'confidence' bzw. 'confidence' für Audio-Checks (wie sicher ist sich das Modell?) + # - bei Bildern: 'generator' ist ein dict, das für die vier geprüften Modelle # 'dall_e', 'stable_diffusion', 'this_person_does_not_exist' und 'midjourney' # jeweils einen 'confidence'-Wert angibt. # # AIORNot-API-Dokumentation: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978 + if is_image: + endpoint_url = image_endpoint_url + else: + endpoint_url = audio_endpoint_url + data = json.dumps({ - 'object': image, + 'object': content, }) api_key = os.environ.get('AIORNOT_API_KEY') headers = { @@ -35,20 +47,34 @@ def query_aiornot(image): 'Accept': 'application/json', } # Base64-Datei? Temporären File abspeichern und über files= hochladen - if image.startswith("data:image/"): + if content.startswith("data:image/"): headers = { 'Authorization': f"Bearer {api_key}", 'Accept': 'application/json', } - fname = save_string_to_temp(image) + fname = save_string_to_temp(content) try: response = requests.post(endpoint_url, headers=headers, files={'object': open(fname, 'rb')}) - except Exception as e: - print("Fehler beim Verbinden mit der AIORNOT-API über multipart:", str(e)) + print("Fehler beim Verbinden mit der AIORNOT-API (Bild) über multipart:", str(e)) return None + # Dateiname? Dann mit Multipart-Header + if not (content.startswith("http://") or content.startswith("https://")): + fname = + headers = { + 'Authorization': f"Bearer {api_key}", + 'Accept': 'application/json', + } + try: + response = requests.post(endpoint_url, + headers=headers, + files={'object': open(fname, 'rb')}) + except Exception as e: + print("Fehler beim Verbinden mit der AIORNOT-API (Bild) über multipart:", str(e)) + return None + try: response = requests.post(endpoint_url, headers=headers, @@ -61,7 +87,7 @@ def query_aiornot(image): # Success return response.json()['report'] elif response.status_code == 400: - print("AIORNOT: Fehlerhafte API-Anfrage") + print("AIORNOT: Fehlerhafte API-Anfrage {}") return None elif response.status_code == 401: print(f"AIORNOT-API-Key 'api_key' nicht gültig") @@ -82,9 +108,9 @@ def query_aiornot(image): return None # Hilfsfunktion: base64 als Temp-File speichern +# Example base64 image string: "data:image/jpeg;base64,/9j/4AAQSkZJRgABAQEAYABgAAD..." import base64 -# Example base64 image string: "data:image/jpeg;base64,/9j/4AAQSkZJRgABAQEAYABgAAD..." def save_string_to_temp(image, fname="./temp"): header, encoded = image.split(",", 1) # Leerzeichen entfernen @@ -98,4 +124,5 @@ def save_string_to_temp(image, fname="./temp"): file_name = f"{fname}.{file_extension}" with open(file_name, "wb") as image_file: image_file.write(image_data) - return file_name \ No newline at end of file + return file_name + diff --git a/src/aichecker/check_tg.py b/src/aichecker/check_tg.py index 76f1eb02978469420e2fa0ca58f9095a4bacc690..57a19f1a87b27bd0bcdd7c89282608c63714107c 100644 --- a/src/aichecker/check_tg.py +++ b/src/aichecker/check_tg.py @@ -54,6 +54,9 @@ def tgc_profile(channel="telegram"): except requests.exceptions.RequestException: print(f"Warning: Channel {c} not found") return None + # Kein Channel? Channel haben immer wenigstens einen Namen in der Infokarte + if tgm.select_one("div.tgme_channel_info") is None: + return None if tgm.select_one("div.tgme_channel_info_description") is not None: description = tgm.select_one("div.tgme_channel_info_description").get_text() else: @@ -63,12 +66,18 @@ def tgc_profile(channel="telegram"): for info_counter in tgm.find_all('div', class_='tgme_channel_info_counter'): counter_value = info_counter.find('span', class_='counter_value').text.strip() counter_type = info_counter.find('span', class_='counter_type').text.strip() + # Sonderbedingungen: nur 1 Link, nur 1 Foto, nur 1 Video? Umbenennen für Konsistenz + if counter_type in ['photo', 'video', 'link', 'subscriber']: + counter_type += "s" channel_info[counter_type] = extract_k(counter_value) # The last post is visible on this page. Gather its number and date. - last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href'] - channel_info['n_posts'] = int(re.search(r'[0-9]+$', last_post_href).group()) - + # Wenn das Konto noch nicht gepostet hat: Abbruch. + if tgm.select_one("div.tgme_widget_message") is None: + channel_info['n_posts'] = 0 + else: + last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href'] + channel_info['n_posts'] = int(re.search(r'[0-9]+$', last_post_href).group()) return channel_info @@ -140,7 +149,7 @@ def tg_post_parse(b, save = True, describe = True): hashtags = [a['href'][3:] for a in textlinks if a['href'].startswith("?q=")] ### Die möglichen Content-Abschnitte eines Posts ### # Text - if b.select_one("div.tgme_widget_message_text_wrap") is not None: + if b.select_one("div.tgme_widget_message_text") is not None: text = b.select_one("div.tgme_widget_message_text").get_text() # Polls: Text der Optionen extrahieren elif b.select_one("div.tgme_widget_message_poll") is not None: @@ -374,14 +383,17 @@ def check_tg_list(posts, check_images = True): # Okay, es geht weiter: Bilder auf KI prüfen for post in posts: if 'aiornot_ai_score' not in post: - if post['photo'] is not None: + if post['video'] is not None: + # Audio des Videos analysieren + post['aiornot_ai_score'] = aiornot_wrapper(post['video'].get('file'), is_image = False) + elif post['photo'] is not None: # Bild analysieren # Das hier ist für die Galerie: AIORNOT kann derzeit # keine base64-Strings checken. # Das Problem an den URLs der Photos ist: sie sind nicht garantiert. base64_image = post['photo'].get('image',None) image = f"data:image/jpeg;base64, {base64_image}" - post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url')) + post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url')) return posts # Wrapper für die check_tg_list Routine. # Gibt Resultate als df zurück, arbeitet aber hinter den Kulissen mit diff --git a/src/aichecker/check_wrappers.py b/src/aichecker/check_wrappers.py index 115b851abbc5bc117182b800ea53ca38e292c733..29463fd5280f43ac087871b956596d149a5becbd 100644 --- a/src/aichecker/check_wrappers.py +++ b/src/aichecker/check_wrappers.py @@ -1,5 +1,5 @@ from .detectora import query_detectora -from .imagecheck import query_aiornot +from .aiornot import query_aiornot from .transcribe import gpt4_description # Konstante @@ -19,23 +19,27 @@ def detectora_wrapper(text: str): print(f"\b{'X' if score >= d_thresh else '.'}",end="") return score -def aiornot_wrapper(image): +def aiornot_wrapper(content, is_image = True): # Verpackung. Fortschrittsbalken. - if image is None: + if content is None: print(" ", end="") return # Fortschrittsbalken print("?", end="") - aiornot_report = query_aiornot(image) - # Beschreibung: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978 - aiornot_dict = ({ - 'link_id': image, - 'aiornot_score': aiornot_report['verdict'], - 'aiornot_confidence': aiornot_report['ai']['confidence'], - 'aiornot_generator': aiornot_report['generator'], + report = query_aiornot(content, is_image) + # Beschreibung: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978 + if report is not None: + aiornot_dict = ({ + 'aiornot_score': report['verdict'], + # Unterscheidung: Bilder haben den Confidence score im Unter-Key 'ai' + 'aiornot_confidence': report['ai']['confidence'] if 'ai' in report else report['confidence'], + 'aiornot_generator': report['generator'] if 'generator' in report else 'Audio', }) - print(f"\b{'X' if aiornot_dict['aiornot_score'] != 'human' else '.'}",end="") - return aiornot_dict + print(f"\b{'X' if aiornot_dict['aiornot_score'] != 'human' else '.'}",end="") + return aiornot_dict + else: + print("\b,") + return None def bsky_aiornot_wrapper(did,embed): diff --git a/src/aichecker/transcribe.py b/src/aichecker/transcribe.py index aaa6d5ddb7ed76ea9e8571e20ed3e5cb57a983eb..d9e9e83c389c991a27b9bdbacf4601458475dbb1 100644 --- a/src/aichecker/transcribe.py +++ b/src/aichecker/transcribe.py @@ -66,19 +66,47 @@ def ai_description(image): # Return ai-generated description return desc2 -def transcribe(audio): +def transcribe(fname): # Wrapper; ruft eine der drei Whisper-Transcribe-Varianten auf. # Favorit: das beschleunigte whisper-s2t # (das aber erst CTranslate2 mit METAL-Unterstützung braucht auf dem Mac # bzw. CUDA auf Windows-Rechnern) + # + # Als erstes: Das in Telegram übliche .ogg-Audioformat konvertieren + if ".ogg" in fname.lower(): + fname = convert_ogg_to_m4a(fname) try: - text = transcribe_whisper(audio) + text = transcribe_whisper(fname) + #text = transcribe_api(fname) # return transcribe_jax(audio) # return transcribe_ws2t(audio) return text except: return "" +from pydub import AudioSegment + +def convert_ogg_to_m4a(input_file): + # Load the OGG file + try: + audio = AudioSegment.from_ogg(input_file) + # Export the audio to an M4A file + output_file = Path(input_file).with_suffix('.m4a') + audio.export(output_file, format="m4a") + except: + return None + + +def transcribe_api(fname): + client = OpenAI() + audio_file= open(fname, "rb") + transcription = client.audio.transcriptions.create( + model="whisper-1", + file=audio_file + ) + return (transcription.text) + + def transcribe_whisper(fname, model="large-v3-turbo"): # Vanilla Whisper. Womöglich nicht die schnellste Lösung. # Installiere einfach mit