import json import unicodedata import re from collections import OrderedDict # ---------------------------- # Configuration # ---------------------------- INPUT_FILE = "paires.json" OUTPUT_FILE = "paires_clean.json" MIN_TOKENS = 5 MAX_TOKENS = 200 MIN_QUALITY_SCORE = 0.60 print("=== Dataset cleaning + quality scoring started ===") # ---------------------------- # Normalization helpers # ---------------------------- def normalize_text(text: str) -> str: text = unicodedata.normalize("NFKC", text) text = re.sub(r"\s+", " ", text).strip() text = text.replace("’", "'").replace("‘", "'").replace("“", '"').replace("”", '"') return text def token_count(text: str) -> int: return len(text.split()) # ---------------------------- # Quality scoring # ---------------------------- def length_ratio_score(src_len, tgt_len): """ Ideal ratio FR/UK ≈ 0.9 – 1.3 """ ratio = tgt_len / max(src_len, 1) if ratio < 0.5 or ratio > 2.0: return 0.0 elif 0.75 <= ratio <= 1.5: return 1.0 else: return max(0.0, 1.0 - abs(ratio - 1.1)) def lexical_density_score(text): """ Penalize very repetitive or trivial translations """ tokens = text.split() if not tokens: return 0.0 unique_ratio = len(set(tokens)) / len(tokens) return min(1.0, unique_ratio * 1.5) def quality_score(src, tgt): src_len = token_count(src) tgt_len = token_count(tgt) l_score = length_ratio_score(src_len, tgt_len) d_score = lexical_density_score(tgt) return 0.7 * l_score + 0.3 * d_score # ---------------------------- # Load + clean + score # ---------------------------- unique_sources = OrderedDict() stats = { "total": 0, "removed_length": 0, "removed_duplicates": 0, "removed_quality": 0, } with open(INPUT_FILE, "r", encoding="utf-8") as f: for line in f: stats["total"] += 1 item = json.loads(line) src = normalize_text(item["text"]) tgt = normalize_text(item["translation"]) src_len = token_count(src) tgt_len = token_count(tgt) # Length filtering if not (MIN_TOKENS <= src_len <= MAX_TOKENS): stats["removed_length"] += 1 continue if not (MIN_TOKENS <= tgt_len <= MAX_TOKENS): stats["removed_length"] += 1 continue # Deduplication if src in unique_sources: stats["removed_duplicates"] += 1 continue # Quality score q_score = quality_score(src, tgt) if q_score < MIN_QUALITY_SCORE: stats["removed_quality"] += 1 continue unique_sources[src] = { "translation": tgt, "quality_score": round(q_score, 3) } # ---------------------------- # Report # ---------------------------- print(f"Total lines processed: {stats['total']}") print(f"Removed (length): {stats['removed_length']}") print(f"Removed (duplicates): {stats['removed_duplicates']}") print(f"Removed (quality): {stats['removed_quality']}") print(f"Final kept pairs: {len(unique_sources)}") # ---------------------------- # Save cleaned dataset # ---------------------------- with open(OUTPUT_FILE, "w", encoding="utf-8") as f: for src, data in unique_sources.items(): json.dump( { "text": src, "translation": data["translation"], "quality_score": data["quality_score"], }, f, ensure_ascii=False ) f.write("\n") print(f"=== Cleaning completed ===") print(f"Clean dataset saved to: {OUTPUT_FILE}")