#!/usr/bin/env python3 import base64 import os import sys import time import hmac import hashlib import struct import urllib.parse from http import cookies as Cookie # Paths and constants BASE_DIR = os.path.abspath(os.path.dirname(__file__)) STATE_DIR = os.path.abspath(os.path.join(BASE_DIR, '..', 'state')) PASSWD2F_FILE = '/opt/passwd2fa' LOG_FILE = '/opt/2fa/logfile.txt' COOKIE_NAME = '2FA_Auth' # Logging helper def log_debug(message): ts = time.strftime('%Y-%m-%d %H:%M:%S') sys.stderr.write(f"[trace6] {ts} {message}\n") # timestamp = time.strftime('%Y-%m-%d %H:%M:%S') # with open(LOG_FILE, 'a') as log_file: # log_file.write(f"[{timestamp}] {message}\n") # Initialize logging log_debug('Script started') method = os.environ.get('REQUEST_METHOD') or '' ctype = os.environ.get('CONTENT_TYPE', '') clength = os.environ.get('CONTENT_LENGTH', '') log_debug(f"Request method: {method}, Content-Type: {ctype}, Content-Length: {clength}") # Determine original request URL orig_path = os.environ.get('REDIRECT_URL') or os.environ.get('REQUEST_URI') or '/' query = os.environ.get('REDIRECT_QUERY_STRING') or os.environ.get('QUERY_STRING', '') initial_url = orig_path + ('?' + query if query else '') log_debug(f'Initial URL: {initial_url}') # Handle key parameter or cookie session qs = os.environ.get('QUERY_STRING', '') params = urllib.parse.parse_qs(qs) key_param = params.get('key', [None])[0] key_provided = bool(key_param) if key_provided: auth_key = key_param log_debug(f'Using key from GET: {auth_key}') else: cookie = Cookie.SimpleCookie(os.environ.get('HTTP_COOKIE', '')) if COOKIE_NAME in cookie: auth_key = cookie[COOKIE_NAME].value log_debug(f'Using existing cookie: {auth_key}') else: random_bytes = os.urandom(80) auth_key = base64.b64encode(random_bytes).decode('ascii') auth_key = ''.join(ch for ch in auth_key if ch.isalnum()) cookie[COOKIE_NAME] = auth_key cookie[COOKIE_NAME]['path'] = '/' # session cookie expires on browser close log_debug(f'Generated new cookie: {auth_key}') if os.environ.get('HTTPS', '').lower() in ('on', '1'): cookie[COOKIE_NAME]['secure'] = True cookie[COOKIE_NAME]['httponly'] = True # Validate REMOTE_USER username = os.environ.get('REMOTE_USER', '') log_debug(f'Remote user: {username}') if not username: print('Status: 401 Unauthorized') if not key_provided: print(cookie) print() sys.exit() # Parse OTP from POST posted_otp = None return_url = initial_url if method.upper() == 'POST': try: length = int(clength) except ValueError: length = 0 body = sys.stdin.read(length) if length > 0 else '' log_debug(f'Raw POST body: {body}') post_params = urllib.parse.parse_qs(body) posted_otp = post_params.get('otp', [None])[0] if posted_otp is not None: posted_otp = posted_otp.strip() # trim whitespace post_key = post_params.get('key', [None])[0] if post_key: auth_key = post_key key_provided = True log_debug(f'Using key from POST: {auth_key}') return_url = post_params.get('original_url', [initial_url])[0] log_debug(f'Parsed otp: {posted_otp}, return_url: {return_url}') # TOTP generation def calculate_totp(secret): b32 = secret.strip().replace(' ', '').upper() pad = (-len(b32)) % 8 b32_padded = b32 + '=' * pad log_debug(f'Base32 secret: {b32_padded}') try: key_bytes = base64.b32decode(b32_padded) except Exception as e: log_debug(f'Error decoding secret: {e}') return None interval = int(time.time()) // 30 msg = struct.pack('>Q', interval) h = hmac.new(key_bytes, msg, hashlib.sha1).digest() offset = h[-1] & 0xF code = (struct.unpack('>I', h[offset:offset+4])[0] & 0x7FFFFFFF) % 1000000 otp = f"{code:06d}" return otp # Validate OTP auth_success = False failure_code = None if posted_otp is not None: secret = None try: with open(PASSWD2F_FILE) as f: for line in f: user, otp_secret = line.strip().split(':', 1) if user == username: secret = otp_secret.strip() break except Exception as e: log_debug(f'Error reading secrets: {e}') if secret: current = calculate_totp(secret) if current and posted_otp == current: try: open(os.path.join(STATE_DIR, auth_key), 'w').close() log_debug(f'State file created: {auth_key}') except Exception as e: log_debug(f'Error writing state file: {e}') auth_success = True else: failure_code = posted_otp # Output if auth_success: if key_provided: print('Content-Type: text/html') print() print(""" 2FA Successful

Authentication Successful

You may close this window and continue in your SVN client.

""") sys.exit() else: print('Status: 302 Found') print(f'Location: {return_url}') print() print() sys.exit() # Failure or initial page print('Status: 499 2FA Required') print(f'Location: /auth/failed?key={auth_key}') if not key_provided: print(cookie) print('Content-Type: text/html') print() print(f""" Two-Factor Authentication

Two-Factor Authentication

Please enter the one-time password (OTP) sent to your device.

{'
Invalid OTP
' if failure_code else ''}
""") sys.exit()