162 lines
5.0 KiB
Python
162 lines
5.0 KiB
Python
import os
|
|
import re
|
|
import requests
|
|
import smtplib
|
|
import socket
|
|
import ssl
|
|
import random
|
|
import time
|
|
|
|
from datetime import datetime
|
|
from email.mime.text import MIMEText
|
|
|
|
DEBUG_LOG = os.environ["DEBUG_LOG"].lower() == "true"
|
|
|
|
IRC_SERVER = os.environ["IRC_SERVER"]
|
|
IRC_PORT = int(os.environ["IRC_PORT"])
|
|
USE_TLS = os.environ["IRC_USE_TLS"].lower() == "true"
|
|
IRC_NICK = os.environ["IRC_NICK"]
|
|
IRC_USER = IRC_NICK
|
|
IRC_REALNAME = IRC_NICK
|
|
IRC_CHANNEL = os.environ["IRC_CHANNEL"]
|
|
|
|
TOPIC_REGEX = os.environ["TOPIC_REGEX"]
|
|
|
|
SMTP_SERVER = os.environ["SMTP_SERVER"]
|
|
SMTP_PORT = int(os.environ["SMTP_PORT"])
|
|
SMTP_USER = os.environ["SMTP_USER"]
|
|
SMTP_PASS = os.environ["SMTP_PASS"]
|
|
EMAIL_TO = os.environ["EMAIL_TO"]
|
|
|
|
MIN_WAIT_TIME = int(os.environ["MIN_WAIT_TIME"])
|
|
MAX_WAIT_TIME = int(os.environ["MAX_WAIT_TIME"])
|
|
|
|
MIN_SLEEP_TIME = int(os.environ["MIN_SLEEP_TIME"])
|
|
MAX_SLEEP_TIME = int(os.environ["MAX_SLEEP_TIME"])
|
|
|
|
topic_pattern = re.compile(TOPIC_REGEX)
|
|
|
|
def log(level, msg):
|
|
if level == "." and not DEBUG_LOG:
|
|
return
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
print(f"[{ts}] [{level}] {msg}", flush=True)
|
|
|
|
def wait_for_network(host="8.8.8.8", port=53, timeout=3, retry_interval=1):
|
|
log("*", "Waiting for network connection")
|
|
while True:
|
|
log(".", f"Still waiting for network connection, testing {host}:{port}")
|
|
try:
|
|
with socket.create_connection((host, port), timeout=timeout):
|
|
return
|
|
except OSError:
|
|
time.sleep(retry_interval)
|
|
|
|
def sendmail(subject, message):
|
|
try:
|
|
msg = MIMEText(message)
|
|
msg["Subject"] = subject
|
|
msg["From"] = SMTP_USER
|
|
msg["To"] = EMAIL_TO
|
|
|
|
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
|
|
server.starttls()
|
|
server.login(SMTP_USER, SMTP_PASS)
|
|
server.send_message(msg)
|
|
log("+", f"Email sent to {EMAIL_TO}")
|
|
except Exception as e:
|
|
log("!", f"Failed to send error email: {e}")
|
|
|
|
def connect_to_channel_and_check_topic():
|
|
log("*", f"Connecting to {IRC_SERVER}:{IRC_PORT} {"with TLS" if USE_TLS else "plain"}")
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(60)
|
|
if USE_TLS:
|
|
context = ssl.create_default_context()
|
|
sock = context.wrap_socket(sock, server_hostname=IRC_SERVER)
|
|
|
|
sock.connect((IRC_SERVER, IRC_PORT))
|
|
|
|
# Send NICK and USER
|
|
nick = f"NICK {IRC_NICK}\r\n".encode()
|
|
log(".", f"> {nick}")
|
|
sock.sendall(nick)
|
|
|
|
irc_user = f"USER {IRC_USER} 0 * :{IRC_REALNAME}\r\n".encode()
|
|
log(".", f"> {irc_user}")
|
|
sock.sendall(irc_user)
|
|
|
|
topic = None
|
|
joined = False
|
|
|
|
while True:
|
|
data = sock.recv(4096)
|
|
if not data:
|
|
break
|
|
lines = data.decode(errors="ignore").split("\r\n")
|
|
for line in lines:
|
|
if not line:
|
|
continue
|
|
log(".", f"< {line}")
|
|
|
|
# Respond to PING
|
|
if line.startswith("PING"):
|
|
resp = line.replace("PING", "PONG")
|
|
pong = f"{resp}\r\n".encode()
|
|
sock.sendall(pong)
|
|
log(".", f"> {pong}")
|
|
|
|
# Check for end of MOTD or welcome to join channel
|
|
if " 001 " in line and not joined:
|
|
# Join channel
|
|
join = f"JOIN {IRC_CHANNEL}\r\n".encode()
|
|
sock.sendall(join)
|
|
log(".", f"> {join}")
|
|
joined = True
|
|
|
|
# Capture topic sent by server upon joining
|
|
# IRC servers send: :server 332 nick #channel :topic here
|
|
if f" 332 {IRC_NICK} " in line:
|
|
parts = line.split(":", 2)
|
|
if len(parts) == 3:
|
|
topic = parts[2]
|
|
log("*", f"Channel topic on join: {topic}")
|
|
|
|
if topic_pattern.search(topic):
|
|
log("+", f"Found topic, sending mail to {EMAIL_TO}")
|
|
sendmail("IRC TOPIC BOT: Found topic", topic)
|
|
|
|
# If we have the topic, we can disconnect after a random delay
|
|
if topic is not None:
|
|
wait_time = random.randint(MIN_WAIT_TIME, MAX_WAIT_TIME)
|
|
log("*", f"Waiting for {wait_time} seconds before disconnecting")
|
|
time.sleep(wait_time)
|
|
bye = b"QUIT :Bye!\r\n"
|
|
sock.sendall(bye)
|
|
log(".", f"> {bye}")
|
|
sock.close()
|
|
log("*", "Disconnected")
|
|
return
|
|
|
|
def main():
|
|
wait_for_network()
|
|
|
|
try:
|
|
res = requests.get("https://api.ipify.org", timeout=10)
|
|
res.raise_for_status()
|
|
ip = res.text.strip()
|
|
log("*", f"Public IP is {ip}")
|
|
except Exception as e:
|
|
log("!", f"Could not get public IP, {e}")
|
|
|
|
while True:
|
|
connect_to_channel_and_check_topic()
|
|
sleep_time = random.randint(MIN_SLEEP_TIME, MAX_SLEEP_TIME)
|
|
log("*", f"Sleeping for {sleep_time} seconds before reconnecting")
|
|
time.sleep(sleep_time)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|