312 lines
11 KiB
Python
Executable File
312 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# /// script
|
|
# requires-python = ">=3.10"
|
|
# dependencies = [
|
|
# "telethon>=1.42",
|
|
# "python-dotenv>=1.0",
|
|
# ]
|
|
# ///
|
|
"""
|
|
Fetch new messages from a list of Telegram channels (job-vacancy feeds),
|
|
filter them per-channel by curated keywords, and surface untriaged ("new")
|
|
channels for keyword decisions.
|
|
|
|
Inputs:
|
|
- channel usernames/ids as positional args, OR `-` to read a JSON array from stdin
|
|
- .env in the project root (TELEGRAM_API_ID, TELEGRAM_API_HASH, TELEGRAM_SESSION_STRING)
|
|
- tracking/telegram_state.json — per-channel last_message_id (created if missing)
|
|
- tracking/telegram_channels.json — per-channel curated metadata (lang, priority)
|
|
and filter (include/exclude). See tracking/CLAUDE.md.
|
|
|
|
Outputs:
|
|
- tracking/telegram_inbox.json — kept messages (filtered for known
|
|
channels, unfiltered for new ones)
|
|
- tracking/telegram_pending_channels.json — keyword-frequency scan for new
|
|
channels (only when present;
|
|
deleted otherwise)
|
|
- tracking/telegram_state.json — updated with newest seen ids
|
|
- stdout — summary, with prominent "NEW CHANNELS" line when pending exist
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
from dotenv import load_dotenv
|
|
from telethon import TelegramClient
|
|
from telethon.sessions import StringSession
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
STATE_FILE = PROJECT_ROOT / "tracking" / "telegram_state.json"
|
|
OUTPUT_FILE = PROJECT_ROOT / "tracking" / "telegram_inbox.json"
|
|
CHANNELS_FILE = PROJECT_ROOT / "tracking" / "telegram_channels.json"
|
|
PENDING_FILE = PROJECT_ROOT / "tracking" / "telegram_pending_channels.json"
|
|
|
|
# First time we see a channel, how far back to look
|
|
DEFAULT_LOOKBACK_DAYS = 30
|
|
# Hard cap per channel per run, to avoid runaway on busy channels
|
|
MAX_PER_CHANNEL = 500
|
|
|
|
|
|
def load_credentials():
|
|
load_dotenv(PROJECT_ROOT / ".env")
|
|
try:
|
|
api_id = int(os.environ["TELEGRAM_API_ID"])
|
|
api_hash = os.environ["TELEGRAM_API_HASH"]
|
|
session = os.environ["TELEGRAM_SESSION_STRING"]
|
|
except KeyError as e:
|
|
sys.exit(f"missing env var: {e}. check .env in project root.")
|
|
return api_id, api_hash, session
|
|
|
|
|
|
def load_json(path, default):
|
|
if path.exists():
|
|
return json.loads(path.read_text())
|
|
return default
|
|
|
|
|
|
def save_json(path, data):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n")
|
|
|
|
|
|
def channel_key(value):
|
|
if isinstance(value, int):
|
|
return value
|
|
if value.lstrip("-").isdigit():
|
|
return int(value)
|
|
return value.lstrip("@")
|
|
|
|
|
|
def normalize_filter(spec):
|
|
"""
|
|
Normalize a channel filter spec to {"groups": [...], "excludes": [...]}.
|
|
|
|
Accepted input forms:
|
|
[] -> trust-all (no filter)
|
|
["a", "b"] -> single OR-group
|
|
[["a","b"], ["c","d"]] -> AND of OR-groups
|
|
[["a","b"], "c"] -> scalars promoted
|
|
{"include": <any of the above>,
|
|
"exclude": ["x", "y"]} -> include + negative filter
|
|
|
|
Filter semantics: message passes if (no exclude keyword matches) AND
|
|
(every include-group has at least one match). Empty include = trust-all
|
|
(only exclude is applied).
|
|
"""
|
|
if isinstance(spec, dict):
|
|
include = spec.get("include", [])
|
|
excludes = list(spec.get("exclude", []))
|
|
else:
|
|
include = spec
|
|
excludes = []
|
|
|
|
if not include:
|
|
groups = []
|
|
elif any(isinstance(item, list) for item in include):
|
|
groups = [item if isinstance(item, list) else [item] for item in include]
|
|
else:
|
|
groups = [include]
|
|
|
|
return {"groups": groups, "excludes": excludes}
|
|
|
|
|
|
def msg_passes_filter(text, spec):
|
|
f = normalize_filter(spec)
|
|
t = text.lower()
|
|
# Any exclude hit → reject immediately
|
|
if any(ex.lower() in t for ex in f["excludes"]):
|
|
return False
|
|
if not f["groups"]:
|
|
return True # trust-all (no positive constraints)
|
|
return all(
|
|
any(kw.lower() in t for kw in group) if group else True
|
|
for group in f["groups"]
|
|
)
|
|
|
|
|
|
def flatten_keywords(keywords_config):
|
|
"""Deduped union of every keyword (include + exclude) across every channel."""
|
|
out = set()
|
|
for spec in keywords_config.values():
|
|
f = normalize_filter(spec)
|
|
for group in f["groups"]:
|
|
out.update(group)
|
|
out.update(f["excludes"])
|
|
return sorted(out)
|
|
|
|
|
|
def keyword_frequencies(messages, all_keywords):
|
|
"""Count case-insensitive substring occurrences of each keyword across messages."""
|
|
counts = {}
|
|
texts_lower = [m["text"].lower() for m in messages]
|
|
for kw in all_keywords:
|
|
kw_lower = kw.lower()
|
|
n = sum(1 for t in texts_lower if kw_lower in t)
|
|
if n > 0:
|
|
counts[kw] = n
|
|
return dict(sorted(counts.items(), key=lambda kv: -kv[1]))
|
|
|
|
|
|
async def fetch_channel(client, key, last_id, lookback_dt):
|
|
messages = []
|
|
max_id = last_id or 0
|
|
max_date = None
|
|
truncated = False
|
|
|
|
kwargs = {"limit": MAX_PER_CHANNEL}
|
|
if last_id:
|
|
kwargs["min_id"] = last_id
|
|
|
|
count = 0
|
|
async for msg in client.iter_messages(key, **kwargs):
|
|
count += 1
|
|
# Anchor cursor to newest id we encounter even if we discard the
|
|
# message (too old, no content). iter_messages yields newest-first.
|
|
if msg.id > max_id:
|
|
max_id = msg.id
|
|
max_date = msg.date
|
|
if last_id is None and msg.date < lookback_dt:
|
|
break
|
|
text = (msg.message or "").strip()
|
|
if not text and not msg.media:
|
|
continue
|
|
messages.append({
|
|
"id": msg.id,
|
|
"date": msg.date.isoformat(),
|
|
"text": text,
|
|
"has_media": bool(msg.media),
|
|
"link": f"https://t.me/{key}/{msg.id}" if isinstance(key, str) else None,
|
|
})
|
|
if count >= MAX_PER_CHANNEL:
|
|
truncated = True
|
|
break
|
|
|
|
messages.reverse() # chronological
|
|
return messages, max_id, max_date, truncated
|
|
|
|
|
|
async def main(channels):
|
|
api_id, api_hash, session = load_credentials()
|
|
state = load_json(STATE_FILE, {})
|
|
channels_config = load_json(CHANNELS_FILE, {})
|
|
all_keywords = flatten_keywords(channels_config)
|
|
lookback_dt = datetime.now(timezone.utc) - timedelta(days=DEFAULT_LOOKBACK_DAYS)
|
|
|
|
output = {
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"lookback_days_for_new_channels": DEFAULT_LOOKBACK_DAYS,
|
|
"channels": {},
|
|
"total_in_inbox": 0,
|
|
}
|
|
pending = {}
|
|
|
|
async with TelegramClient(StringSession(session), api_id, api_hash) as client:
|
|
for raw in channels:
|
|
key = channel_key(raw)
|
|
ch_state = state.get(str(raw), {})
|
|
last_id = ch_state.get("last_message_id")
|
|
first_run = last_id is None
|
|
|
|
ch_cfg = channels_config.get(str(raw))
|
|
ch_lang = ch_cfg.get("lang") if isinstance(ch_cfg, dict) else None
|
|
ch_priority = ch_cfg.get("priority") if isinstance(ch_cfg, dict) else None
|
|
|
|
try:
|
|
msgs, max_id, max_date, truncated = await fetch_channel(
|
|
client, key, last_id, lookback_dt
|
|
)
|
|
except Exception as e:
|
|
output["channels"][str(raw)] = {
|
|
"lang": ch_lang,
|
|
"priority": ch_priority,
|
|
"error": f"{type(e).__name__}: {e}",
|
|
"messages": [],
|
|
}
|
|
continue
|
|
if ch_cfg is None:
|
|
# New / untriaged channel — pass everything through unfiltered,
|
|
# but log keyword-frequency scan for the agent to triage.
|
|
kept = msgs
|
|
filter_mode = "unfiltered (new channel — not yet curated)"
|
|
pending[str(raw)] = {
|
|
"messages_scanned": len(msgs),
|
|
"first_run": first_run,
|
|
"truncated": truncated,
|
|
"keyword_counts_from_other_channels": (
|
|
keyword_frequencies(msgs, all_keywords) if all_keywords else {}
|
|
),
|
|
"note": (
|
|
"decide lang, priority (p1/p2/p3) and keywords (existing or "
|
|
f"new) for this channel; add an entry to "
|
|
f"{CHANNELS_FILE.relative_to(PROJECT_ROOT)} — see tracking/CLAUDE.md"
|
|
),
|
|
}
|
|
else:
|
|
f = normalize_filter(ch_cfg)
|
|
has_constraints = bool(f["groups"] or f["excludes"])
|
|
kept = [m for m in msgs if msg_passes_filter(m["text"], ch_cfg)]
|
|
filter_mode = (
|
|
f"filtered (groups={len(f['groups'])}, excludes={len(f['excludes'])})"
|
|
if has_constraints
|
|
else "trust-all (no filter)"
|
|
)
|
|
|
|
output["channels"][str(raw)] = {
|
|
"lang": ch_lang,
|
|
"priority": ch_priority,
|
|
"seen": len(msgs),
|
|
"kept": len(kept),
|
|
"filtered_out": len(msgs) - len(kept),
|
|
"first_run": first_run,
|
|
"truncated": truncated,
|
|
"filter_mode": filter_mode,
|
|
"messages": kept,
|
|
}
|
|
output["total_in_inbox"] += len(kept)
|
|
|
|
if max_id > (last_id or 0):
|
|
state[str(raw)] = {
|
|
"last_message_id": max_id,
|
|
"last_seen_date": max_date.isoformat() if max_date else None,
|
|
}
|
|
|
|
save_json(STATE_FILE, state)
|
|
save_json(OUTPUT_FILE, output)
|
|
if pending:
|
|
save_json(PENDING_FILE, pending)
|
|
elif PENDING_FILE.exists():
|
|
PENDING_FILE.unlink()
|
|
|
|
rel_inbox = OUTPUT_FILE.relative_to(PROJECT_ROOT)
|
|
rel_pending = PENDING_FILE.relative_to(PROJECT_ROOT)
|
|
rel_chans = CHANNELS_FILE.relative_to(PROJECT_ROOT)
|
|
|
|
print(
|
|
f"{output['total_in_inbox']} messages in inbox "
|
|
f"(from {len(channels)} channels) → {rel_inbox}"
|
|
)
|
|
if pending:
|
|
names = ", ".join(pending.keys())
|
|
print(f"NEW CHANNELS ({len(pending)}): {names}")
|
|
print(f" keyword-frequency scan → {rel_pending}")
|
|
print(f" curate lang/priority/keywords in {rel_chans} (see tracking/CLAUDE.md)")
|
|
|
|
|
|
def parse_args():
|
|
if len(sys.argv) < 2:
|
|
sys.exit(
|
|
"usage: fetch_telegram_jobs.py <channel1> [<channel2> ...]\n"
|
|
" fetch_telegram_jobs.py - (read JSON array of channels from stdin)"
|
|
)
|
|
if sys.argv[1] == "-":
|
|
return json.loads(sys.stdin.read())
|
|
return sys.argv[1:]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main(parse_args()))
|