Skip to content
Snippets Groups Projects
Commit 1177e1ba authored by Jan Eggers's avatar Jan Eggers
Browse files

Zwischenstand

parent d7ba9e6a
No related branches found
No related tags found
No related merge requests found
No preview for this file type
from src.aichecker.check_tg import *
from src.aichecker.detectora import query_detectora
from src.aichecker.imagecheck import query_aiornot
from src.aichecker.aiornot import query_aiornot
TEST = False
def count_posts(posts, threshold):
text_count = 0
score_count = 0
# Hilfsfunktion: CSV einlesen und als df ausgeben
def reimport_csv(fname):
df = pd.read_csv(fname)
# Diese Spalten sind dict:
structured_columns = ['photo', 'sticker', 'video', 'voice', 'forward', 'links']
for c in structured_columns:
df[c] = df[c].apply(convert_to_obj)
# AIORNOT-Bewertung sind dict
df['aiornot_ai_score'] = df['aiornot_ai_score'].apply(convert_to_obj)
return df
for post in posts:
if 'text' in post:
text_count += 1
if __name__ == "__main__":
# tg_check
handle_str = input("Handle des Kanals eingeben: ")
#handle_str = "telegram"
handle = tgc_clean(handle_str)
profile_dict = tgc_profile(handle)
last_post = profile_dict['n_posts']
if profile_dict is None:
profile = tgc_profile(handle)
if profile is None:
print("Kein Konto mit diesem Namen gefunden.")
exit()
print(f"Analysiert wird: {profile_dict['name']}")
print(f"{profile_dict['description']}")
last_post = profile['n_posts']
print(f"Analysiert wird: {profile['name']}")
print(f"{profile['description']}")
print()
print(f"Subscriber: {profile_dict['subscribers']}")
print(f"Posts: {profile_dict['n_posts']}")
print(f"Fotos: {profile_dict['photos']}")
print(f"Videos: {profile_dict['videos']}")
print(f"Links: {profile_dict['links']}")
print(f"Subscriber: {profile['subscribers']}")
print(f"Posts: {profile['n_posts']}")
print(f"Fotos: {profile['photos']}")
print(f"Videos: {profile['videos']}")
print(f"Links: {profile['links']}")
print()
if TEST:
# Lies eine Seite (mit bis zu 16 Posts), ohne Mediendateien anzulegen
# und ohne Audios zu transkribieren
posts = tgc_blockread(profile_dict['name'],nr=1, save=False, describe=False)
posts = tgc_blockread(profile['name'],nr=1, save=False, describe=False)
# Jetzt die aktuellsten Posts, mit Transkription/Mediendateien
#posts = tgc_read(channels_dict['name'],nr=None, save=True, transcribe=True)
#print(posts)
# Nur ein einzelner Post
posts = tgc_read(profile_dict['name'],nr=last_post)
posts = tgc_read(profile['name'],nr=last_post)
print(posts)
# Über die Post-URL
print(tgc_read_url('https://t.me/telegram/46',save=True, describe=True))
# Ein Bereich
posts = tgc_read_range(profile_dict['name'], last_post - 19, last_post, save = True, describe= True)
posts = tgc_read_range(profile['name'], last_post - 19, last_post, save = True, describe= True)
# Ein einzelner Post mit Video, Vorschaubild und Text
posts = tgc_read_range("telegram", 295, 295, True, True)
posts = tgc_read_range("fragunsdochDasOriginal", 27170, 27170, True, True)
post = posts[0]
print("KI-Check:")
if 'detectora_ai_score' not in post:
......@@ -53,15 +57,18 @@ if __name__ == "__main__":
# post['detectora_ai_score'] = detectora_wrapper(post['text'])
print(f"Detectora-Score: {query_detectora(post['text'])}")
if 'aiornot_ai_score' not in post:
if post['photo'] is not None:
if post['video'] is not None:
# Audio des Videos analysieren
post['aiornot_ai_score'] = aiornot_wrapper(post['video'].get('url'), is_image = False)
print("Video: AIORNOT-Score")
# Bild analysieren
# Das hier ist für die Galerie: AIORNOT kann derzeit
# keine base64-Strings checken.
# Das Problem an den URLs der Photos ist: sie sind nicht garantiert.
base64_image = post['photo'].get('image',None)
image = f"data:image/jpeg;base64, {base64_image}"
#post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url'))
print("AIORNOT-AI-Score: {query_aiornot(post['photo']['url']}")
post['aiornot_ai_score'] = aiornot_wrapper(post['photo'].get('url'))
print("AIORNOT-AI-Score: {post['aiornot_ai_score']}")
# Videos kann man nur über das Audio auf KI checken.
# Muss ich erst noch implementieren.
# Die telegram-Videos haben kein Audio; deshalb ist das hier nicht schlimm
......@@ -71,8 +78,9 @@ if __name__ == "__main__":
os.makedirs('tg-checks')
filename = f'tg-checks/{handle}.csv'
if os.path.exists(filename):
existing_df = pd.read_csv(filename)
print(f"Dieser Kanal wurde schon einmal ausgelesen, zuletzt: {max(existing_df[''])}")
existing_df = reimport_csv(filename)
max_nr = max(existing_df['nr'])
print(f"Dieser Kanal wurde schon einmal ausgelesen, zuletzt Post Nr.: {max_nr} - seitdem {last_post-max_nr} neue Posts")
# Lies die 20 aktuellsten Posts, sichere und analysiere sie
#
# KONSTANTEN
......@@ -89,26 +97,29 @@ if __name__ == "__main__":
n_ai_images = 0
n_texts = 0
n_ai_texts = 0
n_videos = 0
n_ai_videos = 0
for post in checked_posts:
if post['text'] is not None:
n_texts += 1
# Detectora-Score für diesen Text abrufen; wenn über der Schwelle,
# KI-Texte um eins hochzählen
n_ai_texts += 1 if posts.get('detectora_ai_score',0) > DETECTORA_T else 0
if post['image'] is not None:
n_ai_texts += 1 if post.get('detectora_ai_score',0) > DETECTORA_T else 0
if post['photo'] is not None:
n_images += 1
try:
# Abruf des Keys kann scheitern, wenn kein Score, deshalb mit Try
ai_score = post['aiornot_ai_score']['ai']['confidence']
except:
# Kein Key abrufbar? Score 0
ai_score = 0
ai_score = post['aiornot_ai_score'].get('confidence',0)
n_ai_images += 1 if ai_score > AIORNOT_T else 0
if post['video'] is not None:
n_videos += 1
ai_score = post['aiornot_ai_score'].get('confidence', 0)
n_ai_videos += 1 if ai_score > AIORNOT_T else 0
print(f"In den {N} Posts: ")
print(f" - Texte: {n_texts}, davon KI-verdächtig: (Schwelle: {n_ai_texts})")
print(f" - Bilder: {n_images}, davon KI-verdächtig: {n_ai_images}")
print(f" - Texte: {n_texts}, davon KI-verdächtig: {n_ai_texts} (Schwelle: {DETECTORA_T})")
print(f" - Bilder: {n_images}, davon KI-verdächtig: {n_ai_images} (Schwelle: {AIORNOT_T})")
print(f"Ergebnis wird in 'tg-checks/{handle}.csv' mit abgespeichert. ")
df = pd.DataFrame(posts)
if ('existing_df' in globals()):
df = pd.concat([existing_df, df]).drop_duplicates(subset=['uri']).reset_index(drop=True)
df.to_csv(f'tg-checks/{handle}.csv', index=False) # Save to CSV for example
from .check_bsky import *
from .transcribe import ai_description
from .detectora import query_detectora
from .imagecheck import query_aiornot
from .aiornot import query_aiornot
from .check_tg import tgc_clean, tgc_read, tgc_blockread, tgc_read_url, tgc_profile
\ No newline at end of file
# imagecheck.py
# aiornot.py
# Erfragt KI-Wahrscheinlichkeit für ein Bild über Hive- und AIorNot-API
#
# Inzwischen entdeckt: brauchen wir eigentlich nicht.
# https://github.com/aiornotinc/aiornot-python
from .transcribe import ai_description
import requests
......@@ -8,25 +11,34 @@ import os
import time
# Konstanten #
endpoint_url = "https://api.aiornot.com/v1/reports/image"
image_endpoint_url = "https://api.aiornot.com/v1/reports/image"
audio_endpoint_url = "https://api.aiornot.com/v1/reports/audio"
def query_aiornot(image):
# Erwartet URI eines Bildes.
def query_aiornot(content, is_image = False):
# Erwartet URI eines Bildes (Bildcheck)
#
# Der Detektor kann die Typen image/apng, image/gif, image/jpeg, image/png, image/svg+xml, image/webp verarbeiten.
#
# Derzeit kann die AIORNOT-API keine base64-Bilder verarbeiten; d.h.: Eine URI der Form
# "data:image/jpeg;base64, ..." führt zu einem 400-Fehler.
# (Also in diesem Fall: Datei abspeichern und über files= hochladen. )
#
# Wichtigste Rückgabewerte im dict:
# - 'verdict' ('human' oder 'ai')
# - 'ai'/'confidence' (wie sicher ist sich das Modell?)
# - 'generator' ist ein dict, das für die vier geprüften Modelle
# - 'ai'/'confidence' bzw. 'confidence' für Audio-Checks (wie sicher ist sich das Modell?)
# - bei Bildern: 'generator' ist ein dict, das für die vier geprüften Modelle
# 'dall_e', 'stable_diffusion', 'this_person_does_not_exist' und 'midjourney'
# jeweils einen 'confidence'-Wert angibt.
#
# AIORNot-API-Dokumentation: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978
if is_image:
endpoint_url = image_endpoint_url
else:
endpoint_url = audio_endpoint_url
data = json.dumps({
'object': image,
'object': content,
})
api_key = os.environ.get('AIORNOT_API_KEY')
headers = {
......@@ -35,20 +47,34 @@ def query_aiornot(image):
'Accept': 'application/json',
}
# Base64-Datei? Temporären File abspeichern und über files= hochladen
if image.startswith("data:image/"):
if content.startswith("data:image/"):
headers = {
'Authorization': f"Bearer {api_key}",
'Accept': 'application/json',
}
fname = save_string_to_temp(content)
try:
response = requests.post(endpoint_url,
headers=headers,
files={'object': open(fname, 'rb')})
except Exception as e:
print("Fehler beim Verbinden mit der AIORNOT-API (Bild) über multipart:", str(e))
return None
# Dateiname? Dann mit Multipart-Header
if not (content.startswith("http://") or content.startswith("https://")):
fname =
headers = {
'Authorization': f"Bearer {api_key}",
'Accept': 'application/json',
}
fname = save_string_to_temp(image)
try:
response = requests.post(endpoint_url,
headers=headers,
files={'object': open(fname, 'rb')})
except Exception as e:
print("Fehler beim Verbinden mit der AIORNOT-API über multipart:", str(e))
print("Fehler beim Verbinden mit der AIORNOT-API (Bild) über multipart:", str(e))
return None
try:
response = requests.post(endpoint_url,
headers=headers,
......@@ -61,7 +87,7 @@ def query_aiornot(image):
# Success
return response.json()['report']
elif response.status_code == 400:
print("AIORNOT: Fehlerhafte API-Anfrage")
print("AIORNOT: Fehlerhafte API-Anfrage {}")
return None
elif response.status_code == 401:
print(f"AIORNOT-API-Key 'api_key' nicht gültig")
......@@ -82,9 +108,9 @@ def query_aiornot(image):
return None
# Hilfsfunktion: base64 als Temp-File speichern
# Example base64 image string: "data:image/jpeg;base64,/9j/4AAQSkZJRgABAQEAYABgAAD..."
import base64
# Example base64 image string: "data:image/jpeg;base64,/9j/4AAQSkZJRgABAQEAYABgAAD..."
def save_string_to_temp(image, fname="./temp"):
header, encoded = image.split(",", 1)
# Leerzeichen entfernen
......@@ -99,3 +125,4 @@ def save_string_to_temp(image, fname="./temp"):
with open(file_name, "wb") as image_file:
image_file.write(image_data)
return file_name
......@@ -54,6 +54,9 @@ def tgc_profile(channel="telegram"):
except requests.exceptions.RequestException:
print(f"Warning: Channel {c} not found")
return None
# Kein Channel? Channel haben immer wenigstens einen Namen in der Infokarte
if tgm.select_one("div.tgme_channel_info") is None:
return None
if tgm.select_one("div.tgme_channel_info_description") is not None:
description = tgm.select_one("div.tgme_channel_info_description").get_text()
else:
......@@ -63,12 +66,18 @@ def tgc_profile(channel="telegram"):
for info_counter in tgm.find_all('div', class_='tgme_channel_info_counter'):
counter_value = info_counter.find('span', class_='counter_value').text.strip()
counter_type = info_counter.find('span', class_='counter_type').text.strip()
# Sonderbedingungen: nur 1 Link, nur 1 Foto, nur 1 Video? Umbenennen für Konsistenz
if counter_type in ['photo', 'video', 'link', 'subscriber']:
counter_type += "s"
channel_info[counter_type] = extract_k(counter_value)
# The last post is visible on this page. Gather its number and date.
# Wenn das Konto noch nicht gepostet hat: Abbruch.
if tgm.select_one("div.tgme_widget_message") is None:
channel_info['n_posts'] = 0
else:
last_post_href = tgm.select('a.tgme_widget_message_date')[-1]['href']
channel_info['n_posts'] = int(re.search(r'[0-9]+$', last_post_href).group())
return channel_info
......@@ -140,7 +149,7 @@ def tg_post_parse(b, save = True, describe = True):
hashtags = [a['href'][3:] for a in textlinks if a['href'].startswith("?q=")]
### Die möglichen Content-Abschnitte eines Posts ###
# Text
if b.select_one("div.tgme_widget_message_text_wrap") is not None:
if b.select_one("div.tgme_widget_message_text") is not None:
text = b.select_one("div.tgme_widget_message_text").get_text()
# Polls: Text der Optionen extrahieren
elif b.select_one("div.tgme_widget_message_poll") is not None:
......@@ -374,7 +383,10 @@ def check_tg_list(posts, check_images = True):
# Okay, es geht weiter: Bilder auf KI prüfen
for post in posts:
if 'aiornot_ai_score' not in post:
if post['photo'] is not None:
if post['video'] is not None:
# Audio des Videos analysieren
post['aiornot_ai_score'] = aiornot_wrapper(post['video'].get('file'), is_image = False)
elif post['photo'] is not None:
# Bild analysieren
# Das hier ist für die Galerie: AIORNOT kann derzeit
# keine base64-Strings checken.
......
from .detectora import query_detectora
from .imagecheck import query_aiornot
from .aiornot import query_aiornot
from .transcribe import gpt4_description
# Konstante
......@@ -19,23 +19,27 @@ def detectora_wrapper(text: str):
print(f"\b{'X' if score >= d_thresh else '.'}",end="")
return score
def aiornot_wrapper(image):
def aiornot_wrapper(content, is_image = True):
# Verpackung. Fortschrittsbalken.
if image is None:
if content is None:
print(" ", end="")
return
# Fortschrittsbalken
print("?", end="")
aiornot_report = query_aiornot(image)
report = query_aiornot(content, is_image)
# Beschreibung: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978
if report is not None:
aiornot_dict = ({
'link_id': image,
'aiornot_score': aiornot_report['verdict'],
'aiornot_confidence': aiornot_report['ai']['confidence'],
'aiornot_generator': aiornot_report['generator'],
'aiornot_score': report['verdict'],
# Unterscheidung: Bilder haben den Confidence score im Unter-Key 'ai'
'aiornot_confidence': report['ai']['confidence'] if 'ai' in report else report['confidence'],
'aiornot_generator': report['generator'] if 'generator' in report else 'Audio',
})
print(f"\b{'X' if aiornot_dict['aiornot_score'] != 'human' else '.'}",end="")
return aiornot_dict
else:
print("\b,")
return None
def bsky_aiornot_wrapper(did,embed):
......
......@@ -66,19 +66,47 @@ def ai_description(image):
# Return ai-generated description
return desc2
def transcribe(audio):
def transcribe(fname):
# Wrapper; ruft eine der drei Whisper-Transcribe-Varianten auf.
# Favorit: das beschleunigte whisper-s2t
# (das aber erst CTranslate2 mit METAL-Unterstützung braucht auf dem Mac
# bzw. CUDA auf Windows-Rechnern)
#
# Als erstes: Das in Telegram übliche .ogg-Audioformat konvertieren
if ".ogg" in fname.lower():
fname = convert_ogg_to_m4a(fname)
try:
text = transcribe_whisper(audio)
text = transcribe_whisper(fname)
#text = transcribe_api(fname)
# return transcribe_jax(audio)
# return transcribe_ws2t(audio)
return text
except:
return ""
from pydub import AudioSegment
def convert_ogg_to_m4a(input_file):
# Load the OGG file
try:
audio = AudioSegment.from_ogg(input_file)
# Export the audio to an M4A file
output_file = Path(input_file).with_suffix('.m4a')
audio.export(output_file, format="m4a")
except:
return None
def transcribe_api(fname):
client = OpenAI()
audio_file= open(fname, "rb")
transcription = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
return (transcription.text)
def transcribe_whisper(fname, model="large-v3-turbo"):
# Vanilla Whisper. Womöglich nicht die schnellste Lösung.
# Installiere einfach mit
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment