#!/usr/bin/env python3 # /// script # requires-python = ">=3.10" # dependencies = [ # "telethon>=1.42", # "python-dotenv>=1.0", # ] # /// """ Fetch new messages from a list of Telegram channels (job-vacancy feeds), filter them per-channel by curated keywords, and surface untriaged ("new") channels for keyword decisions. Inputs: - channel usernames/ids as positional args, OR `-` to read a JSON array from stdin - .env in the project root (TELEGRAM_API_ID, TELEGRAM_API_HASH, TELEGRAM_SESSION_STRING) TELEGRAM_SESSION_STRING must be the **usulsu** (main) account — the "Jobs" folder lives there. - tracking/telegram_state.json — per-channel last_message_id (created if missing) - tracking/telegram_channels.json — per-channel curated metadata (lang, priority) and filter (include/exclude). See tracking/CLAUDE.md. Outputs: - tracking/telegram_inbox.json — kept messages (filtered for known channels, unfiltered for new ones) - tracking/telegram_pending_channels.json — keyword-frequency scan for new channels (only when present; deleted otherwise) - tracking/telegram_state.json — updated with newest seen ids - stdout — summary, with prominent "NEW CHANNELS" line when pending exist """ import asyncio import json import os import sys from datetime import datetime, timedelta, timezone from pathlib import Path from dotenv import load_dotenv from telethon import TelegramClient from telethon.sessions import StringSession PROJECT_ROOT = Path(__file__).resolve().parent.parent STATE_FILE = PROJECT_ROOT / "tracking" / "telegram_state.json" OUTPUT_FILE = PROJECT_ROOT / "tracking" / "telegram_inbox.json" CHANNELS_FILE = PROJECT_ROOT / "tracking" / "telegram_channels.json" PENDING_FILE = PROJECT_ROOT / "tracking" / "telegram_pending_channels.json" # First time we see a channel, how far back to look DEFAULT_LOOKBACK_DAYS = 30 # Hard cap per channel per run, to avoid runaway on busy channels MAX_PER_CHANNEL = 500 def load_credentials(): load_dotenv(PROJECT_ROOT / ".env") try: api_id = int(os.environ["TELEGRAM_API_ID"]) api_hash = os.environ["TELEGRAM_API_HASH"] session = os.environ["TELEGRAM_SESSION_STRING"] except KeyError as e: sys.exit(f"missing env var: {e}. check .env in project root.") return api_id, api_hash, session def load_json(path, default): if path.exists(): return json.loads(path.read_text()) return default def save_json(path, data): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n") def channel_key(value): if isinstance(value, int): return value if value.lstrip("-").isdigit(): return int(value) return value.lstrip("@") def normalize_filter(spec): """ Normalize a channel filter spec to {"groups": [...], "excludes": [...]}. Accepted input forms: [] -> trust-all (no filter) ["a", "b"] -> single OR-group [["a","b"], ["c","d"]] -> AND of OR-groups [["a","b"], "c"] -> scalars promoted {"include": , "exclude": ["x", "y"]} -> include + negative filter Filter semantics: message passes if (no exclude keyword matches) AND (every include-group has at least one match). Empty include = trust-all (only exclude is applied). """ if isinstance(spec, dict): include = spec.get("include", []) excludes = list(spec.get("exclude", [])) else: include = spec excludes = [] if not include: groups = [] elif any(isinstance(item, list) for item in include): groups = [item if isinstance(item, list) else [item] for item in include] else: groups = [include] return {"groups": groups, "excludes": excludes} def msg_passes_filter(text, spec): f = normalize_filter(spec) t = text.lower() # Any exclude hit → reject immediately if any(ex.lower() in t for ex in f["excludes"]): return False if not f["groups"]: return True # trust-all (no positive constraints) return all( any(kw.lower() in t for kw in group) if group else True for group in f["groups"] ) def flatten_keywords(keywords_config): """Deduped union of every keyword (include + exclude) across every channel.""" out = set() for spec in keywords_config.values(): f = normalize_filter(spec) for group in f["groups"]: out.update(group) out.update(f["excludes"]) return sorted(out) def keyword_frequencies(messages, all_keywords): """Count case-insensitive substring occurrences of each keyword across messages.""" counts = {} texts_lower = [m["text"].lower() for m in messages] for kw in all_keywords: kw_lower = kw.lower() n = sum(1 for t in texts_lower if kw_lower in t) if n > 0: counts[kw] = n return dict(sorted(counts.items(), key=lambda kv: -kv[1])) async def fetch_channel(client, key, last_id, lookback_dt): messages = [] max_id = last_id or 0 max_date = None truncated = False kwargs = {"limit": MAX_PER_CHANNEL} if last_id: kwargs["min_id"] = last_id count = 0 async for msg in client.iter_messages(key, **kwargs): count += 1 # Anchor cursor to newest id we encounter even if we discard the # message (too old, no content). iter_messages yields newest-first. if msg.id > max_id: max_id = msg.id max_date = msg.date if last_id is None and msg.date < lookback_dt: break text = (msg.message or "").strip() if not text and not msg.media: continue messages.append({ "id": msg.id, "date": msg.date.isoformat(), "text": text, "has_media": bool(msg.media), "link": f"https://t.me/{key}/{msg.id}" if isinstance(key, str) else None, }) if count >= MAX_PER_CHANNEL: truncated = True break messages.reverse() # chronological return messages, max_id, max_date, truncated async def main(channels): api_id, api_hash, session = load_credentials() state = load_json(STATE_FILE, {}) channels_config = load_json(CHANNELS_FILE, {}) all_keywords = flatten_keywords(channels_config) lookback_dt = datetime.now(timezone.utc) - timedelta(days=DEFAULT_LOOKBACK_DAYS) output = { "generated_at": datetime.now(timezone.utc).isoformat(), "lookback_days_for_new_channels": DEFAULT_LOOKBACK_DAYS, "channels": {}, "total_in_inbox": 0, } pending = {} async with TelegramClient(StringSession(session), api_id, api_hash) as client: for raw in channels: key = channel_key(raw) ch_state = state.get(str(raw), {}) last_id = ch_state.get("last_message_id") first_run = last_id is None ch_cfg = channels_config.get(str(raw)) ch_lang = ch_cfg.get("lang") if isinstance(ch_cfg, dict) else None ch_priority = ch_cfg.get("priority") if isinstance(ch_cfg, dict) else None try: msgs, max_id, max_date, truncated = await fetch_channel( client, key, last_id, lookback_dt ) except Exception as e: output["channels"][str(raw)] = { "lang": ch_lang, "priority": ch_priority, "error": f"{type(e).__name__}: {e}", "messages": [], } continue if ch_cfg is None: # New / untriaged channel — pass everything through unfiltered, # but log keyword-frequency scan for the agent to triage. kept = msgs filter_mode = "unfiltered (new channel — not yet curated)" pending[str(raw)] = { "messages_scanned": len(msgs), "first_run": first_run, "truncated": truncated, "keyword_counts_from_other_channels": ( keyword_frequencies(msgs, all_keywords) if all_keywords else {} ), "note": ( "decide lang, priority (p1/p2/p3) and keywords (existing or " f"new) for this channel; add an entry to " f"{CHANNELS_FILE.relative_to(PROJECT_ROOT)} — see tracking/CLAUDE.md" ), } else: f = normalize_filter(ch_cfg) has_constraints = bool(f["groups"] or f["excludes"]) kept = [m for m in msgs if msg_passes_filter(m["text"], ch_cfg)] filter_mode = ( f"filtered (groups={len(f['groups'])}, excludes={len(f['excludes'])})" if has_constraints else "trust-all (no filter)" ) output["channels"][str(raw)] = { "lang": ch_lang, "priority": ch_priority, "seen": len(msgs), "kept": len(kept), "filtered_out": len(msgs) - len(kept), "first_run": first_run, "truncated": truncated, "filter_mode": filter_mode, "messages": kept, } output["total_in_inbox"] += len(kept) if max_id > (last_id or 0): state[str(raw)] = { "last_message_id": max_id, "last_seen_date": max_date.isoformat() if max_date else None, } save_json(STATE_FILE, state) save_json(OUTPUT_FILE, output) if pending: save_json(PENDING_FILE, pending) elif PENDING_FILE.exists(): PENDING_FILE.unlink() rel_inbox = OUTPUT_FILE.relative_to(PROJECT_ROOT) rel_pending = PENDING_FILE.relative_to(PROJECT_ROOT) rel_chans = CHANNELS_FILE.relative_to(PROJECT_ROOT) print( f"{output['total_in_inbox']} messages in inbox " f"(from {len(channels)} channels) → {rel_inbox}" ) if pending: names = ", ".join(pending.keys()) print(f"NEW CHANNELS ({len(pending)}): {names}") print(f" keyword-frequency scan → {rel_pending}") print(f" curate lang/priority/keywords in {rel_chans} (see tracking/CLAUDE.md)") def parse_args(): if len(sys.argv) < 2: sys.exit( "usage: fetch_telegram_jobs.py [ ...]\n" " fetch_telegram_jobs.py - (read JSON array of channels from stdin)" ) if sys.argv[1] == "-": return json.loads(sys.stdin.read()) return sys.argv[1:] if __name__ == "__main__": asyncio.run(main(parse_args()))