Skip to content
Snippets Groups Projects
Commit b71e39e4 authored by Jan Eggers's avatar Jan Eggers
Browse files

Korrektur

parent 3893c1e8
No related branches found
No related tags found
No related merge requests found
......@@ -10,7 +10,7 @@ authors = [
maintainers = [
{name = "Jan Eggers", email = "jan.eggers@hr.de"},
]
version = "0.2.4" # Neue Versionsnummern für pip-Update
version = "0.2.4.1" # Neue Versionsnummern für pip-Update
description = "Bluesky- und Telegram-Konten auf KI-Inhalte checken"
requires-python = ">=3.8"
dependencies = [
......
......@@ -10,9 +10,11 @@ import pandas as pd
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from dateutil.parser import isoparse
import os
import re
import base64
import logging
from .transcribe import gpt4_description, transcribe, convert_mp4_to_mp3, convert_ogg_to_mp3
from .check_wrappers import detectora_wrapper, aiornot_wrapper
......@@ -49,12 +51,13 @@ def tgc_profile(channel="telegram"):
"""
c = tgc_clean(channel)
c_url = f"https://t.me/s/{c}"
logging.info(f"Lese Info aus Channel {c}")
try:
response = requests.get(c_url)
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
except requests.exceptions.RequestException:
print(f"Warning: Channel {c} not found")
logging.warning(f"Warning: Channel {c} not found")
return None
# Kein Channel? Channel haben immer wenigstens einen Namen in der Infokarte
if tgm.select_one("div.tgme_channel_info") is None:
......@@ -98,12 +101,12 @@ def tgc_profile(channel="telegram"):
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
except requests.exceptions.RequestException:
print(f"Warning: Channel {c} not found")
logging.warning(f"Warning: Channel {c} not found")
return None
# Leider scheint tgme_widget_message_service_date erst nachgeladen zu werden;
# alternativ: nimm das Datum des frühesten Posts
if tgm.select_one("time.time") is not None:
timestamp = datetime.fromisoformat(tgm.select_one("time.time")['datetime']).isoformat()
timestamp = isoparse(tgm.select_one("time.time")['datetime']).isoformat()
channel_info['created'] = timestamp
return channel_info
......@@ -140,14 +143,14 @@ def save_url(fname, name, mdir="./media"):
try:
os.makedirs(os.path.dirname(content_file), exist_ok=True)
except:
print(f"Kann kein Media-Directory in {mdir} öffnen")
logging.error(f"Kann kein Media-Directory in {mdir} öffnen")
return None
try:
with open(content_file, 'wb') as f:
f.write(requests.get(fname).content)
return content_file
except:
print(f"Kann Datei {content_file} nicht schreiben")
logging.error(f"Kann Datei {content_file} nicht schreiben")
return None
def get_channel_from_url(channel:str):
......@@ -157,10 +160,11 @@ def tg_post_parse(b, save = True, describe = True):
# Immer vorhanden:
# Postnummer, Zeitstempel (auch wenn er in Einzel-Posts als datetime auftaucht und in Channel_seiten als time)
b_nr = int(re.search(r'[0-9]+$', b.select_one("a.tgme_widget_message_date")['href']).group())
logging.info(f"Parse Telegram-Post Nr. {b_nr}")
if b.select_one("time.time") is not None:
timestamp = datetime.fromisoformat(b.select_one("time.time")['datetime']).isoformat()
timestamp = isoparse(b.select_one("time.time")['datetime']).isoformat()
else: # Einzel-Post
timestamp = datetime.fromisoformat(b.select_one("time.datetime")['datetime']).isoformat()
timestamp = isoparse(b.select_one("time.datetime")['datetime']).isoformat()
#
if b.select_one("span.tgme_widget_message_views") is not None:
views = extract_k(b.select_one("span.tgme_widget_message_views").get_text())
......@@ -311,9 +315,10 @@ def tgc_read_url(channel_url, save=True, describe = True):
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
# Error message?
logging.info(f"Lese Einzelpost: {channel_url}")
print("'",end="")
if tgm.select_one("div.tgme_widget_message_error") is not None:
print(f"Fehler beim Lesen von {channel_url}")
logging.error(f"Fehler beim Lesen von {channel_url}")
return None
b = tgm.select_one("div.tgme_widget_message")
return tg_post_parse(b, save, describe)
......@@ -343,6 +348,7 @@ def tgc_blockread(cname="telegram", nr=None, save=True, describe=False):
# Nur einen Post holen? Dann t.me/<channel>/<nr>,
# sonst t.me/s/<channel>/<nr>
channel_url = f"https://t.me/s/{c}/{nr}"
logging.info(f"Lese Telegram-Channel {c}, Block um den Post {nr}")
response = requests.get(channel_url)
response.raise_for_status()
tgm = BeautifulSoup(response.content, 'html.parser')
......@@ -440,10 +446,19 @@ def check_tg_list(posts, check_images = True):
# Gibt Resultate als df zurück, arbeitet aber hinter den Kulissen mit
# einer Liste von dicts (anders als check_bsky)
def check_tgc(cname, n=20, cursor = None, check_images = True):
exit("Funktion noch nicht definiert")
return None
def tg_hydrate(posts):
# Nimmt eine Liste von Posts und zieht die zugehörigen Dateien,
# erstellt Beschreibungen und Transkriptionen.
#
# Fernziel: Asynchrone Verarbeitung.
return posts
def tg_evaluate(posts, check_texts = True, check_images = True):
# Nimmt eine Liste von Posts und ergänzt KI-Einschätzung von Detectora
# und AIORNOT.
for post in posts:
return posts
def retrieve_tg_csv(cname, path= "tg-checks"):
fname = path + "/" + cname + ".csv"
......
from .detectora import query_detectora
# from .aiornot import query_aiornot
from .transcribe import gpt4_description
import logging
# Alternative zu meinen selbst geschriebenen aiornot-Routinen:
# https://github.com/aiornotinc/aiornot-python
......@@ -54,7 +55,7 @@ def aiornot_wrapper(content, is_image = True):
try:
response = aiornot_client.image_report_by_url(content) if is_url else aiornot_client.image_report_by_file(content)
except Exception as e:
print(f"AIORNOT-Image-API-Fehler: {e}")
logging.error(f"AIORNOT-Image-API-Fehler: {e}")
return None
else:
# Achtung: DERZEIT (13.1.25) verarbeitet die Audio-API nur MP3-Dateien, keine MP4/M4A.
......@@ -63,7 +64,7 @@ def aiornot_wrapper(content, is_image = True):
try:
response = aiornot_client.audio_report_by_url(content) if is_url else aiornot_client.audio_report_by_file(content)
except Exception as e:
print(f"AIORNOT-Audio-API-Fehler: {e}")
logging.error(f"AIORNOT-Audio-API-Fehler: {e}")
return None
# Beschreibung: https://docs.aiornot.com/#5b3de85d-d3eb-4ad1-a191-54988f56d978
if response is not None:
......
......@@ -22,6 +22,7 @@ Detectora-Key muss als DETECTORA_API_KEY in .env hinterlegt sein.
import requests
import json
import os
import logging
# os.environ.get('OPENAI_API_KEY')
......@@ -31,12 +32,13 @@ api_url = "https://backendkidetektor-apim.azure-api.net/watson"
def query_detectora(text):
if text == '':
return None
logging.info(f"Checke Text mit Detectora: {text[:20]}...")
data = {
'query': text,
}
api_key = os.environ.get('DETECTORA_API_KEY')
if api_key is None or api_key == "":
print("DETECTORA_API_KEY ist nicht gesetzt")
logging.error("DETECTORA_API_KEY ist nicht gesetzt")
return None
headers = {
'APIKey': api_key,
......@@ -51,12 +53,12 @@ def query_detectora(text):
# Success
return response.json()['fake_probability']
elif response.status_code == 400:
print(f"DETECTORA: Fehlerhafte API-Anfrage: \'{data['query']}\'")
logging.error(f"DETECTORA: Fehlerhafte API-Anfrage: \'{data['query']}\'")
return None
elif response.status_code == 401:
print(f"DETECTORA_API_KEY {api_key} nicht gültig")
logging.error(f"DETECTORA_API_KEY {api_key} nicht gültig")
return None
except Exception as e:
print("Fehler beim Verbinden mit der DETECTORA-API:", str(e))
logging.error("Fehler beim Verbinden mit der DETECTORA-API:", str(e))
return None
......@@ -8,6 +8,7 @@ from openai import OpenAI
from pathlib import Path
import os
import whisper
import logging
from pydub import AudioSegment # für die OGG-zu-MP4-Konversion
prompt = """Du bist Barrierefreiheits-Assistent.
......@@ -93,10 +94,11 @@ def convert_ogg_to_m4a(input_file):
try:
audio = AudioSegment.from_ogg(input_file)
# Export the audio to an M4A file
output_file = os.path.splitext("./media/fragunsdochDasOriginal_27176_voice.ogg")[0]+".m4a"
output_file = os.path.splitext(input_file)[0]+".m4a"
audio.export(output_file, format="mp4")
return output_file
except:
logging.error(f"Konnte Datei {input_file} nicht von OGG nach M4A wandeln")
return None
def convert_ogg_to_mp3(input_file):
......@@ -108,6 +110,7 @@ def convert_ogg_to_mp3(input_file):
audio.export(output_file, format="mp3")
return output_file
except:
logging.error(f"Konnte Datei {input_file} nicht von OGG nach MP3 wandeln")
return None
def convert_mp4_to_mp3(input_file):
......@@ -119,6 +122,7 @@ def convert_mp4_to_mp3(input_file):
audio.export(output_file, format="mp3")
return output_file
except:
logging.error(f"Konnte Datei {input_file} nicht von MP4 nach MP3 wandeln")
return None
def transcribe_api(fname):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment