python.traduction/Traduction/main.py

405 lines
15 KiB
Python

import PyPDF2
import requests
import json
from reportlab.lib.pagesizes import letter
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_JUSTIFY
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
import os, time
# Configuration
PDF_PATH = "Traduction/TaniaBorecMemoir(Ukr).pdf"
OLLAMA_MODEL = "traductionUkrainienVersFrancais:latest"
OLLAMA_URL = "http://localhost:11434/api/generate"
TARGET_LANGUAGE = "français"
CHECKPOINT_FILE = "Traduction/checkpoint.json"
TEMP_OUTPUT_TXT = "Traduction/output_temp.txt"
FINAL_OUTPUT_PDF = PDF_PATH.replace(".pdf",f"({TARGET_LANGUAGE.upper()[:2]})_V9.pdf")
FINAL_OUTPUT_TXT = PDF_PATH.replace(".pdf",f"({TARGET_LANGUAGE.upper()[:2]})_V9.txt")
DEBUG = True
def extract_parameters_from_template(template_str):
"""Extrait les paramètres du modèle à partir du template."""
import re
parameters = {}
if not template_str or not isinstance(template_str, str):
return parameters
# Si la chaîne contient "parameters:", récupère ce qui suit
if 'parameters:' in template_str:
params_section = template_str.split('parameters:', 1)[1]
else:
# Sinon, utilise la chaîne directement (elle contient déjà les paramètres)
params_section = template_str
# Parse les lignes de paramètres
# Format: "stop "<end_of_turn>""
# "temperature 0.1"
lines = params_section.split('\n')
for line in lines:
if not line.strip():
continue
# Divise par le premier groupe d'espaces blancs
# Cela sépare la clé des valeurs avec leurs espaces
parts = line.split(None, 1) # split() avec maxsplit=1 divise sur les espaces
if len(parts) == 2:
param_name = parts[0].strip()
param_value = parts[1].strip()
parameters[param_name] = param_value
return parameters
def get_llm_model_info(model=OLLAMA_MODEL):
"""
Extrait les informations du modèle LLM depuis Ollama, y compris le nom depuis la ligne FROM du Modelfile.
@param model: Nom du modèle à interroger.
@type model: str
@return: Dictionnaire contenant les informations du modèle, ou None en cas d'erreur.
@rtype: dict | None
"""
try:
# Chemin vers le fichier Modelfile (supposé être dans le même répertoire que le script)
modelfile_path = os.path.join(os.path.dirname(__file__), "Modelfile")
# Initialisation de model_name
model_name = "none"
# Lecture du fichier Modelfile pour extraire le nom du modèle
if os.path.exists(modelfile_path):
with open(modelfile_path, 'r', encoding='utf-8') as file:
for line in file:
if line.strip().startswith('FROM '):
model_name = line.strip().split('FROM ')[1].strip()
break
# URL pour obtenir les informations du modèle
info_url = OLLAMA_URL.replace("/api/generate", "/api/show")
payload = {"name": model}
response = requests.post(info_url, json=payload)
if response.status_code == 200:
model_data = response.json()
# Gère le cas où model_data est une chaîne
if isinstance(model_data, str):
model_data = json.loads(model_data)
# Extrait les paramètres du template
parameters = model_data.get('parameters', '')
parsed_params = extract_parameters_from_template(parameters)
# Extraction du nom depuis la ligne FROM
modelfile_content = model_data.get('Modelfile', '')
# Extraction des informations principales
info = {
"temperature": parsed_params.get('temperature', model_data.get("temperature", "Not available")),
"name": model_name,
"num_ctx": parsed_params.get('num_ctx', "Not available"),
"top_k": parsed_params.get('top_k', "Not available"),
"top_p": parsed_params.get('top_p', "Not available"),
"system": model_data.get("system", "Not available"),
"modified_at": model_data.get("modified_at", "Not available"),
}
return info
else:
print(f"Erreur lors de la récupération du modèle : {response.text}")
return None
except Exception as e:
print(f"Erreur lors de l'accès aux informations du modèle : {e}")
return None
def display_llm_info():
"""Retourne les informations du modèle LLM formatées."""
from datetime import datetime
info = get_llm_model_info(OLLAMA_MODEL)
if info:
# Formate la date en jj/mm/AAAA
modified_at = info.get('modified_at', 'Not available')
if modified_at and modified_at != 'Not available':
try:
# Parse la date ISO
date_obj = datetime.fromisoformat(modified_at.replace('Z', '+00:00'))
# Formate en jj/mm/AAAA
formatted_date = date_obj.strftime("%d/%m/%Y")
except:
formatted_date = modified_at
else:
formatted_date = 'Not available'
return f"LLM Modèle: {info['name']}<br//>\nDate de modification: {formatted_date}<br//>\nSystem: {info['system']}<br//>\nTemperature: {info['temperature']}"
else:
return "Informations du modèle non disponibles"
def register_unicode_font():
"""Enregistre une police TrueType qui supporte le cyrillique."""
# Recherche une police système qui supporte le cyrillique
font_paths = [
r"C:\Windows\Fonts\DejaVuSans.ttf",
r"C:\Windows\Fonts\Calibri.ttf",
r"C:\Windows\Fonts\arial.ttf",
]
for font_path in font_paths:
if os.path.exists(font_path):
try:
pdfmetrics.registerFont(TTFont('UnicodeFont', font_path))
return 'UnicodeFont'
except Exception as e:
print(f"Erreur lors de l'enregistrement de {font_path}: {e}")
# Si aucune police spéciale trouvée, utilise Helvetica par défaut
print("Aucune police Unicode trouvée, utilisation d'Helvetica")
return 'Helvetica'
# Charge ou initialise le checkpoint
def load_checkpoint():
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE, "r") as f:
return json.load(f)
return {"last_processed_index": -1, "results": {}}
# Sauvegarde le checkpoint
def save_checkpoint(last_index, results):
# Trier les clés du dictionnaire results
sorted_results = {key: results[key] for key in sorted(results.keys(), key=int)}
with open(CHECKPOINT_FILE, "w") as f:
# Utiliser un espace d'indentation de 4 espaces pour rendre le JSON plus lisible
json.dump({"last_processed_index": last_index, "results": sorted_results}, f, indent=4)
# Sauvegarde les résultats temporaires dans un fichier TXT
def save_temp_results(results):
with open(TEMP_OUTPUT_TXT, "w", encoding="utf-8") as f:
for idx, translation in results.items():
f.write(f"Paragraphe {idx}:\n{translation}\n\n")
# Extraction du texte du PDF (inchangée)
def extract_text_from_pdf(pdf_path):
text_by_page = []
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
for page in reader.pages:
text = page.extract_text()
text_by_page.append(text)
return text_by_page
# Découpage en paragraphes (inchangé)
def split_pages_in_paragraphs(pages_text):
import re
full_text = "\n".join(pages_text)
full_text = re.sub(r'(?<![.!?])\n+(?![.!?])', ' ', full_text)
paragraphs = re.split(r'(?<=[.!?])\s*\n+', full_text.strip())
paragraphs = [re.sub(r'\s+', ' ', p).strip() for p in paragraphs if p.strip()]
return paragraphs
# Envoi à Ollama (inchangé)
def send_to_ollama(text, target_lang=TARGET_LANGUAGE, model=OLLAMA_MODEL):
full_prompt = f"\n\nTraduis le texte suivant de l'ukrainien vers le {target_lang} :\n{text}"
payload = {"model": model, "prompt": full_prompt, "stream": False}
response = requests.post(OLLAMA_URL, data=json.dumps(payload))
if response.status_code == 200:
return response.json()["response"]
else:
raise Exception(f"Erreur Ollama: {response.text}")
# Création du PDF final avec numéros de chapitres dans la marge
def create_pdf_from_results(results, output_path):
"""Crée un PDF à partir des résultats de traduction, avec des notes dans la marge et un numéro de page."""
story = []
font_name = register_unicode_font()
# Styles personnalisés
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=16,
textColor='#1f4788',
spaceAfter=0.3*inch,
alignment=TA_JUSTIFY,
fontName=font_name
)
page_style = ParagraphStyle(
'PageHeading',
parent=styles['Heading2'],
fontSize=12,
textColor='#1f4788',
spaceAfter=0.2*inch,
spaceBefore=0.2*inch,
fontName=font_name
)
body_style = ParagraphStyle(
'CustomBody',
parent=styles['BodyText'],
fontSize=11,
alignment=TA_JUSTIFY,
spaceAfter=0.2*inch,
fontName=font_name
)
note_style = ParagraphStyle(
'CustomBody',
parent=styles['BodyText'],
fontSize=8,
alignment=TA_JUSTIFY,
spaceAfter=0,
fontName=font_name
)
# Création du document avec les callbacks pour les notes et le numéro de page
doc = SimpleDocTemplate(
output_path,
pagesize=letter,
topMargin=inch,
bottomMargin=inch,
)
# Titre avec la langue cible
story.append(Paragraph(f"Traduction - Ukrainien vers {TARGET_LANGUAGE.capitalize()}", title_style))
story.append(Paragraph(f"Document : {PDF_PATH}", title_style))
story.append(Spacer(1, 0.2*inch))
# Contenu
for paragraph_num, translation in results.items():
formatted_text = translation.replace("\n", "<br/>")
if DEBUG:
# Ajoute le paragraphe avec sa note
story.append(Paragraph(paragraph_num, note_style))
story.append(Paragraph(formatted_text, body_style))
# Infos sur le LLM
story.append(Spacer(1, 0.2*inch))
story.append(Paragraph(display_llm_info(), page_style))
# Construction du PDF
doc.build(story)
print(f"PDF généré avec succès : {output_path}")
def create_txt_from_results(results, output_path):
"""Crée un fichier TXT à partir des résultats de traduction."""
OUTPUT_TXT_PATH = output_path.replace(".pdf", f".txt") # Chemin du fichier TXT de sortie
# Titre avec la langue cible
title_text = f"Traduction - Ukrainien vers {TARGET_LANGUAGE.capitalize()}"
with open(OUTPUT_TXT_PATH, 'w', encoding='utf-8') as txt_file:
txt_file.write(title_text + "\n\n")
# Contenu
for paragraph_num, translation in results.items():
# Ajoute les numéro de paragraphe et chapitre
if(DEBUG): txt_file.write(f"{paragraph_num}\n")
# Préserver la mise en page en convertissant les sauts de ligne
txt_file.write(translation + "\n\n")
# Infos sur le LLM
txt_file.write("\n")
txt_file.write(display_llm_info() + "\n")
# Fonction principale
def main():
checkpoint = load_checkpoint()
last_index = checkpoint["last_processed_index"]
results = checkpoint["results"]
pages = extract_text_from_pdf(PDF_PATH)
paragraphs = split_pages_in_paragraphs(pages)
# Liste de tous les indices de batches attendus (par pas de batch_size)
batch_size = 5
expected_batch_indices = list(range(0, len(paragraphs), batch_size))
# Liste des indices de batches déjà présents dans results
present_batch_indices = set()
for key in results.keys():
batch_start = int(int(key) // batch_size * batch_size) # Arrondit à l'indice de début de batch
present_batch_indices.add(batch_start)
# Trouve les batches manquants
missing_batches = [i for i in expected_batch_indices if i not in present_batch_indices and i <= last_index]
# Affichage des batches manquants (pour débogage)
print(f"Batches manquants détectés : {missing_batches}")
# Traduction des paragraphes manquants
temps_cumule = 0.0
for i in missing_batches:
batch = paragraphs[i:i + batch_size]
paragraph_cumul = "\n".join(batch)
print(f"{15 * '-'} Traduction des paragraphes manquants {i+1} à {min(i + batch_size, len(paragraphs))} / {len(paragraphs)}")
try:
debut_chrono = time.time()
result = send_to_ollama(paragraph_cumul)
fin_chrono = time.time()
temps_paragraphe = fin_chrono - debut_chrono
temps_cumule += temps_paragraphe
# Conversion en minutes et secondes
minutes_paragraphe, secondes_paragraphe = divmod(temps_paragraphe, 60)
minutes_cumule, secondes_cumule = divmod(temps_cumule, 60)
print(f"{result}")
results[str(i)] = result
save_checkpoint(len(paragraphs), results) # Met à jour le dernier indice du batch
save_temp_results(results)
except Exception as e:
print(f"Erreur lors de la traduction du paragraphe {i}: {e}")
print(f" Temps de traduction : {int(minutes_paragraphe)} min {secondes_paragraphe:.2f} sec")
print(f" Temps cumulé : {int(minutes_cumule)} min {secondes_cumule:.2f} sec")
# Traitement des paragraphes suivants
for i in range(last_index + 1, len(paragraphs), batch_size):
batch = paragraphs[i:i + batch_size]
paragraph_cumul = "\n".join(batch)
print(f"{15 * '-'} Traduction des paragraphes {i+1} à {min(i + batch_size, len(paragraphs))} / {len(paragraphs)}")
try:
debut_chrono = time.time()
result = send_to_ollama(paragraph_cumul)
fin_chrono = time.time()
temps_paragraphe = fin_chrono - debut_chrono
temps_cumule += temps_paragraphe
# Conversion en minutes et secondes
minutes_paragraphe, secondes_paragraphe = divmod(temps_paragraphe, 60)
minutes_cumule, secondes_cumule = divmod(temps_cumule, 60)
print(f"{result}")
results[str(i)] = result
save_checkpoint(i + batch_size - 1, results)
save_temp_results(results)
except Exception as e:
print(f"Erreur : {e}")
continue
print(f" Temps de traduction : {int(minutes_paragraphe)} min {secondes_paragraphe:.2f} sec")
print(f" Temps cumulé : {int(minutes_cumule)} min {secondes_cumule:.2f} sec")
save_temp_results(results)
create_pdf_from_results(results, FINAL_OUTPUT_PDF)
create_txt_from_results(results, FINAL_OUTPUT_TXT)
print("Traduction terminée !")
if __name__ == "__main__":
main()