Skip to content
Snippets Groups Projects
Commit 560430aa authored by Jan Eggers's avatar Jan Eggers
Browse files

AIORNOT-Audio gefixt

parent 2714927c
No related branches found
No related tags found
No related merge requests found
No preview for this file type
......@@ -47,7 +47,7 @@ if __name__ == "__main__":
image_posts = [post for post in df['aiornot_ai_score'].to_list() if post is not None]
# Liste auspacken, nur die Dicts ohne None-Elemente
image_list = [item for sublist in image_posts for item in sublist]
ai_list = [item for item in image_list if item['aiornot_score']!='human']
ai_list = [item for item in image_list if item['score']!='human']
if len(image_list) == 0:
p_ai = 0
else:
......
from src.aichecker.check_tg import *
from src.aichecker.detectora import query_detectora
from src.aichecker.aiornot import query_aiornot
from src.aichecker.transcribe import convert_mp4_to_mp3, convert_ogg_to_mp3
from ast import literal_eval
# KONSTANTEN
N = 20
DETECTORA_T = 0.8 # 80%
AIORNOT_T = 0.9 # 90%
AIORNOT_T = 0.5 # 50% - AIORNOT selbst setzt den Wert sehr niedrig an.
TEST = False
# Hilfsfunktion: CSV einlesen und als df ausgeben
def convert_to_obj(val):
if pd.isna(val):
return None
try:
return literal_eval(val)
except (ValueError, SyntaxError):
return val
def reimport_csv(fname):
df = pd.read_csv(fname)
# Diese Spalten sind dict:
structured_columns = ['photo', 'sticker', 'video', 'voice', 'forward', 'links']
structured_columns = ['photo', 'sticker', 'video', 'voice', 'forwards', 'links']
for c in structured_columns:
df[c] = df[c].apply(convert_to_obj)
# AIORNOT-Bewertung sind dict
......@@ -63,7 +75,8 @@ if __name__ == "__main__":
if 'aiornot_ai_score' not in post:
if post['video'] is not None:
# Audio des Videos analysieren
post['aiornot_ai_score'] = aiornot_wrapper(post['video'].get('file'), is_image = False)
video_file = post['video'].get('file')
post['aiornot_ai_score'] = aiornot_wrapper(convert_mp4_to_mp3(video_file), is_image = False)
print("Video: AIORNOT-Score")
# Bild analysieren
# Das hier ist für die Galerie: AIORNOT kann derzeit
......@@ -107,7 +120,7 @@ if __name__ == "__main__":
n_ai_texts += 1 if post.get('detectora_ai_score',0) > DETECTORA_T else 0
if post['photo'] is not None:
n_images += 1
ai_score = post['aiornot_ai_score'].get('confidence',0)
ai_score = post.get('aiornot_ai_score',{'confidence': 0})['confidence']
n_ai_images += 1 if ai_score > AIORNOT_T else 0
if post['video'] is not None:
n_videos += 1
......@@ -120,6 +133,6 @@ if __name__ == "__main__":
print(f"Ergebnis wird in 'tg-checks/{handle}.csv' mit abgespeichert. ")
df = pd.DataFrame(posts)
if ('existing_df' in globals()):
df = pd.concat([existing_df, df]).drop_duplicates(subset=['uri']).reset_index(drop=True)
df = pd.concat([existing_df, df]).drop_duplicates(subset=['nr']).reset_index(drop=True)
df.to_csv(f'tg-checks/{handle}.csv', index=False) # Save to CSV for example
......@@ -10,7 +10,7 @@ authors = [
maintainers = [
{name = "Jan Eggers", email = "jan.eggers@hr.de"},
]
version = "0.2.2" # Neue Versionsnummern für pip-Update
version = "0.2.3.0" # Neue Versionsnummern für pip-Update
description = "Bluesky- und Telegram-Konten auf KI-Inhalte checken"
requires-python = ">=3.8"
dependencies = [
......
......@@ -6,7 +6,7 @@ Von Channeln kann man Posts auf zwei Arten lesen:
- über ihre Kontextseite (t.me/s/<channel>/<id>)
- über ihre individuelle Post-Seite (t.me/s/<channel>/<id>)
# Kontextseite
## Kontextseite
Die Kontextseite ist etwas bequemer, denn:
......@@ -14,7 +14,7 @@ Die Kontextseite ist etwas bequemer, denn:
- sie wird auch angezeigt, wenn es die Post-ID nicht gibt
# Postseite
## Postseite
Die Postseite lädt die eigentlichen Inhalte als Embed in ein iframe. Es ist möglich, dieses Embed direkt über requests zu laden:
......@@ -24,4 +24,90 @@ Die Postseite lädt die eigentlichen Inhalte als Embed in ein iframe. Es ist mö
- Untersuchen, ob es ein Element div.tgme_widget_message_error enthält - dann konnte der Post nicht geladen werden
- Enthält ein Element div.tgme_widget_message_error
<div class="tgme_widget_message text_not_supported_wrap js-widget_message" data-post="telegram/361" data-view="eyJjIjotMTAwNTY0MDg5MiwicCI6MzYxLCJ0IjoxNzM1OTQxNTY5LCJoIjoiOGJmNWMzZDM1OTE0Y2I1NTMyIn0" data-peer="c1005640892_-6044378432856379164" data-peer-hash="556f33b85ddb50a1e1" data-post-id="361">
\ No newline at end of file
<div class="tgme_widget_message text_not_supported_wrap js-widget_message" data-post="telegram/361" data-view="eyJjIjotMTAwNTY0MDg5MiwicCI6MzYxLCJ0IjoxNzM1OTQxNTY5LCJoIjoiOGJmNWMzZDM1OTE0Y2I1NTMyIn0" data-peer="c1005640892_-6044378432856379164" data-peer-hash="556f33b85ddb50a1e1" data-post-id="361">
# Funktionen in der Library:
## tgc_clean(cname:str) -> str
Bin mir gar nicht sicher, dass das immer nötig ist; als ich 2018 mit R eine erste Telegram-Library geschrieben
habe, war die Funktion Standard bei mir - es wird Gründe gegeben haben. Schaden tut's nicht.
* **cname**: Telegram-Kanal-Name, z.B. "FragUnsDochDasOriginal"
* **Rückgabewert**: Der geputzte Namens-String ohne unzulässige Zeichen
## tgc_profile(channel: str) -> dict
Liest die Rahmendaten eines Channels aus der Profil-Karte und gibt sie als Dict zurück.
* **channel**: Telegram-Kanal-Name (str), z.B. "Telegram"
* **Rückgabewert**: ein dict mit den Keys
* 'channel': str, name des Kanals
* 'description': str, Beschreibung des Kanals
* 'image_url' und 'image': str und base64-str; das Profilbild
* 'subscribers': int Anzahl Abonnenten
* 'photos': int, Anzahl veröffentlichter Fotos
* 'videos': int, Anzahl veröffentlichter Videos
* 'links': int, Anzahl veröffentlichter LInks
* 'n_posts' int, Nummer des letzten publizierten Posts
n_posts entspricht streng genommen nicht ganz der Anzahl der veröffentlichten Posts, weil diese Zahl insbesondere zu Beginn eines Kanals springen kann.
## tgc_read, tgc_blockread etc.
Auslesen von Telegram-Posts über die Kontext- bzw. Post-Seite.
Alle Read-Funktionen nutzen intern eine Funktion namens tgc_post_parse, die aus einem HTML-Objekt die Daten extrahiert und geben deshalb im wesentlichen alle dasselbe zurück: eine Liste von dict-Einträgen (für jeden Posts) mit folgenden Keys:
* 'channel': str, Kanalname
* 'nr': int, Nummer des Posts
* 'url': str, URL des Posts (so wie man sie durch Klick auf den Zeitstempel bekommt)
* 'views': int, Anzahl der Abrufe des Posts (laut Telegram - Vorsicht!)
* 'views_ts': str, Zeitpunkt des Views-Abrufs in Iso-Format
* 'timedate': str, Zeitstempel des Posts im ISO-Format
* 'text': str, Text des Posts (falls vorhanden, sonst None)
* 'photo': dict, gepostetes Bild bzw. Vorschaubild bei Videos (falls vorhanden, sonst None)
* 'url': str, Link auf Photo-Datei (könne evtl. nicht stabil sein!)
* 'image': str, base64 des Bildes
* 'description': str, KI-Inhaltsbeschreibung des Bildes (nur falls bestellt, sonst kein Key)
* 'file': str, Dateiname des gesicherten Medieninhalts. (nur falls save=True, sonst kein Key)
* 'sticker': sticker,
* 'url': str, Link auf Sticker-Image-Datei (könne evtl. nicht stabil sein!)
* 'image': str, base64 des Bildes
* 'description': str, KI-Inhaltsbeschreibung des Bildes (nur falls describe=True, sonst kein Key)
* 'file': str, Dateiname des gesicherten Medieninhalts. (nur falls save=True, sonst kein Key)
* 'video': video,
* 'url': str, Link auf Photo-Datei (könne evtl. nicht stabil sein!)
* 'image': str, base64 des Bildes
* 'description': str, KI-Inhaltsbeschreibung des Bildes (nur falls describe=True, sonst kein Key)
* 'file': str, Dateiname des gesicherten Medieninhalts. (nur falls save=True, sonst kein Key)
* 'voice': dict, Sprachnachricht, falls vorhanden
* 'url': str, Link auf Photo-Datei (könne evtl. nicht stabil sein!)
* 'duration': str, base64 des Bildes
* 'transcription': str, KI-Transkription des Audios (nur falls describe=True, sonst kein Key)
* 'file': str, Dateiname des gesicherten Medieninhalts (nur falls save=True, sonst kein Key)
* 'forwards': forward,
* 'url': str, Link zum Quell-Posts
* 'name': str, Name des Quell-Kanals
* 'poll': str, Umfrage-Typ (bekannt: 'anonymous')
* 'links': Liste der Link-Strings
* 'hashtags': Liste der Hashtag-Strings, [f"#{tag}" for tag in hashtags],
### tgc_read(cname, nr, save=True, describe = False) -> dict
Liest einzelnen Post aus dem angegebenen Kanal mit der angegebenen Nr. über Post-Seite Ruft ```tgc_read_url``` auf.
### tgc_read_url(url, save=True, describe = False) -> dict
Liest einzelnen Post über Post-Seiten-Link.
### tgc_blockread(cname="telegram", nr=None, save=True, describe=False) -> [dict]
Ruft eine Kontext-Seite zentriert auf den Post ```nr``` auf. (wie t.me/s/<kanalname>/<nr>) - Die Kontext-Seite zeigt bis zu 16 Posts und zentriert i.d.R. auf den gegebenen Post.
### tgc_read_range(cname, n1=1, n2=None, save=True, describe = True) -> [dict]
Liest die Posts von n1 bis n2
### tgc_read_number(cname, n = 20, cutoff = None, save=True, describe = True)
Beginnt beim Post mit der Nummer ```cutoff``` und versucht dann ```n``` Posts zu lesen.
## check_tg_list(posts, check_images = True) -> [dict]
* **posts**: Eine Liste von dicts (Format siehe oben: tgc_read...)
* **check_images**: AIORNOT-Prüfung auf KI-Inhalte ja/nein?
\ No newline at end of file
from .check_bsky import *
from .transcribe import ai_description
from .transcribe import ai_description, convert_mp4_to_mp3, convert_ogg_to_m4a, convert_ogg_to_mp3
from .detectora import query_detectora
from .aiornot import query_aiornot
from .check_tg import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile
\ No newline at end of file
from .check_wrappers import aiornot_wrapper, detectora_wrapper, bsky_aiornot_wrapper
from .check_tg import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile, tgc_read_range, tgc_read_number, check_tg_list
\ No newline at end of file
......@@ -13,7 +13,7 @@ from datetime import datetime
import os
import re
import base64
from .transcribe import gpt4_description, transcribe
from .transcribe import gpt4_description, transcribe, convert_mp4_to_mp3, convert_ogg_to_mp3
from .check_wrappers import detectora_wrapper, aiornot_wrapper
def extract_k(n_str: str):
......@@ -35,11 +35,13 @@ def tgc_profile(channel="telegram"):
dict with the keys
- 'channel'
- 'description'
- 'subscribers', (Number)
- 'image' (base64 des Profilbilds) und 'image_url' (URL des Profilbilds)
- 'subscribers' (Number)
- 'photos' (number)
- 'videos' (number)
- 'links' (number)
- 'n_posts' (number of the last published post)
- 'created' (wann angelegt)
Example:
profile = tgc_profile("wilhelmkachel")
......@@ -61,8 +63,18 @@ def tgc_profile(channel="telegram"):
description = tgm.select_one("div.tgme_channel_info_description").get_text()
else:
description = None
img = tgm.select_one("i.tgme_page_photo_image")
if img is not None:
image_url = img.select_one("img")['src']
image = base64.b64encode(requests.get(image_url).content).decode('utf-8')
else:
image_url = None
image = None
channel_info = {'name': c,
'description': description}
'description': description,
'image_url': image_url,
'image': image,
}
for info_counter in tgm.find_all('div', class_='tgme_channel_info_counter'):
counter_value = info_counter.find('span', class_='counter_value').text.strip()
counter_type = info_counter.find('span', class_='counter_type').text.strip()
......@@ -78,6 +90,21 @@ def tgc_profile(channel="telegram"):
else:
last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href']
channel_info['n_posts'] = int(re.search(r'[0-9]+$', last_post_href).group())
# Get founding date of account.
# Dafür die seite t.me/<cname>/1 aufrufen und nach tgme_widget_message_service_date suchen
c_url = f"https://t.me/s/{c}/1"
try:
response = requests.get(c_url)
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
except requests.exceptions.RequestException:
print(f"Warning: Channel {c} not found")
return None
# Leider scheint tgme_widget_message_service_date erst nachgeladen zu werden;
# alternativ: nimm das Datum des frühesten Posts
if tgm.select_one("time.time") is not None:
timestamp = datetime.fromisoformat(tgm.select_one("time.time")['datetime']).isoformat()
channel_info['created'] = timestamp
return channel_info
......@@ -131,9 +158,9 @@ def tg_post_parse(b, save = True, describe = True):
# Postnummer, Zeitstempel (auch wenn er in Einzel-Posts als datetime auftaucht und in Channel_seiten als time)
b_nr = int(re.search(r'[0-9]+$', b.select_one("a.tgme_widget_message_date")['href']).group())
if b.select_one("time.time") is not None:
timestamp = datetime.fromisoformat(b.select_one("time.time")['datetime'])
timestamp = datetime.fromisoformat(b.select_one("time.time")['datetime']).isoformat()
else: # Einzel-Post
timestamp = datetime.fromisoformat(b.select_one("time.datetime")['datetime'])
timestamp = datetime.fromisoformat(b.select_one("time.datetime")['datetime']).isoformat()
#
if b.select_one("span.tgme_widget_message_views") is not None:
views = extract_k(b.select_one("span.tgme_widget_message_views").get_text())
......@@ -211,11 +238,12 @@ def tg_post_parse(b, save = True, describe = True):
video = {'url': video_url,
'thumbnail': video_thumbnail_url,
}
photo = {
'url': video_thumbnail_url,
'image': base64.b64encode(requests.get(video_thumbnail_url).content).decode('utf-8')
}
if save or describe:
# Thumbnail wird unter photo abgespeichert
photo = {'url': video_thumbnail_url,
'image': base64.b64encode(requests.get(video_thumbnail_url).content).decode('utf-8')
}
photo['file'] = save_url(video_thumbnail_url, f"{channel}_{b_nr}_photo")
else:
video = {'url': video_url,
......@@ -279,6 +307,7 @@ def tgc_read_url(channel_url, save=True, describe = False):
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
# Error message?
print("'",end="")
if tgm.select_one("div.tgme_widget_message_error") is not None:
print(f"Fehler beim Lesen von {channel_url}")
return None
......@@ -328,17 +357,12 @@ def tgc_read_range(cname, n1=1, n2=None, save=True, describe = True):
max_nr = profile['n_posts']
if n1 > max_nr:
return None
loop = True
n = n1
posts = []
while loop:
new_posts = tgc_blockread(cname, n1, save, describe)
nr_values = [post['nr'] for post in new_posts]
last_nr = max(nr_values)
# Abbruchbedingungen: Letzten Post des Channels erreicht, oder Ende des zu lesenden Bereichs
loop = (max_nr == last_nr) or (last_nr > n2)
posts.extend(new_posts)
# Posts aufsteigend sortieren
posts.sort(key=lambda x: x['nr'])
while n <= n2:
new_post = tgc_read(cname, n, save, describe)
n = n + 1
posts.append(new_post)
return posts
def tgc_read_number(cname, n = 20, cutoff = None, save=True, describe = True):
......@@ -389,10 +413,14 @@ def check_tg_list(posts, check_images = True):
if 'aiornot_ai_score' not in post:
if post['video'] is not None:
# Audio des Videos analysieren
post['aiornot_ai_score'] = aiornot_wrapper(post['video'].get('file'), is_image = False)
fname = post['video'].get('file')
post['aiornot_ai_score'] = aiornot_wrapper(convert_mp4_to_mp3(fname), is_image = False)
elif post['photo'] is not None:
# Bild analysieren
post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('file'), is_image = True)
elif post['voice'] is not None:
fname = post['voice'].get('file')
post['aiornot_ai_score'] = aiornot_wrapper(convert_ogg_to_mp3(fname), is_image = False)
return posts
# Wrapper für die check_tg_list Routine.
# Gibt Resultate als df zurück, arbeitet aber hinter den Kulissen mit
......
......@@ -36,18 +36,30 @@ def aiornot_wrapper(content, is_image = True):
print("?", end="")
is_url = (content.startswith("http://") or content.startswith("https://"))
if is_image:
response = aiornot_client.image_report_by_url(content) if is_url else aiornot_client.image_report_by_file(content)
try:
response = aiornot_client.image_report_by_url(content) if is_url else aiornot_client.image_report_by_file(content)
except Exception as e:
print(f"AIORNOT-Image-API-Fehler: {e}")
return None
else:
response = aiornot_client.audio_report_by_url(content) if is_url else aiornot_client.audio_report_by_file(content)
# Achtung: DERZEIT (13.1.25) verarbeitet die Audio-API nur MP3-Dateien, keine MP4/M4A.
# Und Ogg schon gleich zweimal nicht.
# Sie gibt auch noch keinen Confidence-Wert zurück, anders als dokumentiert.
try:
response = aiornot_client.audio_report_by_url(content) if is_url else aiornot_client.audio_report_by_file(content)
except Exception as e:
print(f"AIORNOT-Audio-API-Fehler: {e}")
return None
# Beschreibung: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978
if response is not None:
aiornot_dict = ({
'aiornot_score': response.report.verdict,
'score': response.report.verdict,
# Unterscheidung: Bilder haben den Confidence score im Unter-Key 'ai'
'aiornot_confidence': response.report.ai.confidence if hasattr(response.report, 'ai') else response.report.confidence,
'aiornot_generator': response.report.generator if hasattr(response.report, 'generator') else None,
# Audios SOLLTEN eien Confidence-Wert in response.report.confidence haben, haben es aber nicht
'confidence': response.report.ai.confidence if hasattr(response.report, 'ai') else .99,
'generator': response.report.generator if hasattr(response.report, 'generator') else None,
})
print(f"\b{'X' if aiornot_dict['aiornot_score'] != 'human' else '.'}",end="")
print(f"\b{'X' if aiornot_dict['score'] != 'human' else '.'}",end="")
return aiornot_dict
else:
print("\b,")
......
......@@ -8,6 +8,7 @@ from openai import OpenAI
from pathlib import Path
import os
import whisper
from pydub import AudioSegment # für die OGG-zu-MP4-Konversion
prompt = """Du bist Barrierefreiheits-Assistent.
Du erstellst eine deutsche Bildbeschreibung für den Alt-Text.
......@@ -74,7 +75,7 @@ def transcribe(fname, use_api = False):
#
# Als erstes: Das in Telegram übliche .ogg-Audioformat konvertieren
if ".ogg" in fname.lower():
fname = convert_ogg_to_m4a(fname)
fname = convert_ogg_to_mp3(fname)
try:
if use_api:
text = transcribe_api(fname)
......@@ -86,18 +87,39 @@ def transcribe(fname, use_api = False):
except:
return ""
from pydub import AudioSegment
def convert_ogg_to_m4a(input_file):
# Load the OGG file
try:
audio = AudioSegment.from_ogg(input_file)
# Export the audio to an M4A file
output_file = Path(input_file).with_suffix('.m4a')
audio.export(output_file, format="m4a")
output_file = os.path.splitext("./media/fragunsdochDasOriginal_27176_voice.ogg")[0]+".m4a"
audio.export(output_file, format="mp4")
return output_file
except:
return None
def convert_ogg_to_mp3(input_file):
# Load the OGG file
try:
audio = AudioSegment.from_ogg(input_file)
# Export the audio to an M4A file
output_file = os.path.splitext(input_file)[0]+".mp3"
audio.export(output_file, format="mp3")
return output_file
except:
return None
def convert_mp4_to_mp3(input_file):
# Load the video file
try:
audio = AudioSegment.from_file(input_file, format="mp4")
# Export the audio to an MP3 file
output_file = os.path.splitext(input_file)[0]+".mp3"
audio.export(output_file, format="mp3")
return output_file
except:
return None
def transcribe_api(fname):
client = OpenAI()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment