cv-2026/scripts/fetch_telegram_jobs.py

313 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "telethon>=1.42",
# "python-dotenv>=1.0",
# ]
# ///
"""
Fetch new messages from a list of Telegram channels (job-vacancy feeds),
filter them per-channel by curated keywords, and surface untriaged ("new")
channels for keyword decisions.
Inputs:
- channel usernames/ids as positional args, OR `-` to read a JSON array from stdin
- .env in the project root (TELEGRAM_API_ID, TELEGRAM_API_HASH, TELEGRAM_SESSION_STRING)
TELEGRAM_SESSION_STRING must be the **usulsu** (main) account — the "Jobs" folder lives there.
- tracking/telegram_state.json — per-channel last_message_id (created if missing)
- tracking/telegram_channels.json — per-channel curated metadata (lang, priority)
and filter (include/exclude). See tracking/CLAUDE.md.
Outputs:
- tracking/telegram_inbox.json — kept messages (filtered for known
channels, unfiltered for new ones)
- tracking/telegram_pending_channels.json — keyword-frequency scan for new
channels (only when present;
deleted otherwise)
- tracking/telegram_state.json — updated with newest seen ids
- stdout — summary, with prominent "NEW CHANNELS" line when pending exist
"""
import asyncio
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from dotenv import load_dotenv
from telethon import TelegramClient
from telethon.sessions import StringSession
PROJECT_ROOT = Path(__file__).resolve().parent.parent
STATE_FILE = PROJECT_ROOT / "tracking" / "telegram_state.json"
OUTPUT_FILE = PROJECT_ROOT / "tracking" / "telegram_inbox.json"
CHANNELS_FILE = PROJECT_ROOT / "tracking" / "telegram_channels.json"
PENDING_FILE = PROJECT_ROOT / "tracking" / "telegram_pending_channels.json"
# First time we see a channel, how far back to look
DEFAULT_LOOKBACK_DAYS = 30
# Hard cap per channel per run, to avoid runaway on busy channels
MAX_PER_CHANNEL = 500
def load_credentials():
load_dotenv(PROJECT_ROOT / ".env")
try:
api_id = int(os.environ["TELEGRAM_API_ID"])
api_hash = os.environ["TELEGRAM_API_HASH"]
session = os.environ["TELEGRAM_SESSION_STRING"]
except KeyError as e:
sys.exit(f"missing env var: {e}. check .env in project root.")
return api_id, api_hash, session
def load_json(path, default):
if path.exists():
return json.loads(path.read_text())
return default
def save_json(path, data):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n")
def channel_key(value):
if isinstance(value, int):
return value
if value.lstrip("-").isdigit():
return int(value)
return value.lstrip("@")
def normalize_filter(spec):
"""
Normalize a channel filter spec to {"groups": [...], "excludes": [...]}.
Accepted input forms:
[] -> trust-all (no filter)
["a", "b"] -> single OR-group
[["a","b"], ["c","d"]] -> AND of OR-groups
[["a","b"], "c"] -> scalars promoted
{"include": <any of the above>,
"exclude": ["x", "y"]} -> include + negative filter
Filter semantics: message passes if (no exclude keyword matches) AND
(every include-group has at least one match). Empty include = trust-all
(only exclude is applied).
"""
if isinstance(spec, dict):
include = spec.get("include", [])
excludes = list(spec.get("exclude", []))
else:
include = spec
excludes = []
if not include:
groups = []
elif any(isinstance(item, list) for item in include):
groups = [item if isinstance(item, list) else [item] for item in include]
else:
groups = [include]
return {"groups": groups, "excludes": excludes}
def msg_passes_filter(text, spec):
f = normalize_filter(spec)
t = text.lower()
# Any exclude hit → reject immediately
if any(ex.lower() in t for ex in f["excludes"]):
return False
if not f["groups"]:
return True # trust-all (no positive constraints)
return all(
any(kw.lower() in t for kw in group) if group else True
for group in f["groups"]
)
def flatten_keywords(keywords_config):
"""Deduped union of every keyword (include + exclude) across every channel."""
out = set()
for spec in keywords_config.values():
f = normalize_filter(spec)
for group in f["groups"]:
out.update(group)
out.update(f["excludes"])
return sorted(out)
def keyword_frequencies(messages, all_keywords):
"""Count case-insensitive substring occurrences of each keyword across messages."""
counts = {}
texts_lower = [m["text"].lower() for m in messages]
for kw in all_keywords:
kw_lower = kw.lower()
n = sum(1 for t in texts_lower if kw_lower in t)
if n > 0:
counts[kw] = n
return dict(sorted(counts.items(), key=lambda kv: -kv[1]))
async def fetch_channel(client, key, last_id, lookback_dt):
messages = []
max_id = last_id or 0
max_date = None
truncated = False
kwargs = {"limit": MAX_PER_CHANNEL}
if last_id:
kwargs["min_id"] = last_id
count = 0
async for msg in client.iter_messages(key, **kwargs):
count += 1
# Anchor cursor to newest id we encounter even if we discard the
# message (too old, no content). iter_messages yields newest-first.
if msg.id > max_id:
max_id = msg.id
max_date = msg.date
if last_id is None and msg.date < lookback_dt:
break
text = (msg.message or "").strip()
if not text and not msg.media:
continue
messages.append({
"id": msg.id,
"date": msg.date.isoformat(),
"text": text,
"has_media": bool(msg.media),
"link": f"https://t.me/{key}/{msg.id}" if isinstance(key, str) else None,
})
if count >= MAX_PER_CHANNEL:
truncated = True
break
messages.reverse() # chronological
return messages, max_id, max_date, truncated
async def main(channels):
api_id, api_hash, session = load_credentials()
state = load_json(STATE_FILE, {})
channels_config = load_json(CHANNELS_FILE, {})
all_keywords = flatten_keywords(channels_config)
lookback_dt = datetime.now(timezone.utc) - timedelta(days=DEFAULT_LOOKBACK_DAYS)
output = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"lookback_days_for_new_channels": DEFAULT_LOOKBACK_DAYS,
"channels": {},
"total_in_inbox": 0,
}
pending = {}
async with TelegramClient(StringSession(session), api_id, api_hash) as client:
for raw in channels:
key = channel_key(raw)
ch_state = state.get(str(raw), {})
last_id = ch_state.get("last_message_id")
first_run = last_id is None
ch_cfg = channels_config.get(str(raw))
ch_lang = ch_cfg.get("lang") if isinstance(ch_cfg, dict) else None
ch_priority = ch_cfg.get("priority") if isinstance(ch_cfg, dict) else None
try:
msgs, max_id, max_date, truncated = await fetch_channel(
client, key, last_id, lookback_dt
)
except Exception as e:
output["channels"][str(raw)] = {
"lang": ch_lang,
"priority": ch_priority,
"error": f"{type(e).__name__}: {e}",
"messages": [],
}
continue
if ch_cfg is None:
# New / untriaged channel — pass everything through unfiltered,
# but log keyword-frequency scan for the agent to triage.
kept = msgs
filter_mode = "unfiltered (new channel — not yet curated)"
pending[str(raw)] = {
"messages_scanned": len(msgs),
"first_run": first_run,
"truncated": truncated,
"keyword_counts_from_other_channels": (
keyword_frequencies(msgs, all_keywords) if all_keywords else {}
),
"note": (
"decide lang, priority (p1/p2/p3) and keywords (existing or "
f"new) for this channel; add an entry to "
f"{CHANNELS_FILE.relative_to(PROJECT_ROOT)} — see tracking/CLAUDE.md"
),
}
else:
f = normalize_filter(ch_cfg)
has_constraints = bool(f["groups"] or f["excludes"])
kept = [m for m in msgs if msg_passes_filter(m["text"], ch_cfg)]
filter_mode = (
f"filtered (groups={len(f['groups'])}, excludes={len(f['excludes'])})"
if has_constraints
else "trust-all (no filter)"
)
output["channels"][str(raw)] = {
"lang": ch_lang,
"priority": ch_priority,
"seen": len(msgs),
"kept": len(kept),
"filtered_out": len(msgs) - len(kept),
"first_run": first_run,
"truncated": truncated,
"filter_mode": filter_mode,
"messages": kept,
}
output["total_in_inbox"] += len(kept)
if max_id > (last_id or 0):
state[str(raw)] = {
"last_message_id": max_id,
"last_seen_date": max_date.isoformat() if max_date else None,
}
save_json(STATE_FILE, state)
save_json(OUTPUT_FILE, output)
if pending:
save_json(PENDING_FILE, pending)
elif PENDING_FILE.exists():
PENDING_FILE.unlink()
rel_inbox = OUTPUT_FILE.relative_to(PROJECT_ROOT)
rel_pending = PENDING_FILE.relative_to(PROJECT_ROOT)
rel_chans = CHANNELS_FILE.relative_to(PROJECT_ROOT)
print(
f"{output['total_in_inbox']} messages in inbox "
f"(from {len(channels)} channels) → {rel_inbox}"
)
if pending:
names = ", ".join(pending.keys())
print(f"NEW CHANNELS ({len(pending)}): {names}")
print(f" keyword-frequency scan → {rel_pending}")
print(f" curate lang/priority/keywords in {rel_chans} (see tracking/CLAUDE.md)")
def parse_args():
if len(sys.argv) < 2:
sys.exit(
"usage: fetch_telegram_jobs.py <channel1> [<channel2> ...]\n"
" fetch_telegram_jobs.py - (read JSON array of channels from stdin)"
)
if sys.argv[1] == "-":
return json.loads(sys.stdin.read())
return sys.argv[1:]
if __name__ == "__main__":
asyncio.run(main(parse_args()))