70 lines
2.5 KiB
Python
70 lines
2.5 KiB
Python
import traceback
|
|
from requests import session, RequestException, HTTPError
|
|
|
|
from CTFd.utils import get_config
|
|
from .base import BaseRouter
|
|
from ..db import DBContainer, WhaleContainer
|
|
|
|
|
|
class TrpRouter(BaseRouter):
|
|
name = "trp"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.ses = session()
|
|
self.url = get_config('whale:trp_api_url', '').rstrip("/")
|
|
self.common = ''
|
|
for container in DBContainer.get_all_alive_container():
|
|
self.register(container)
|
|
|
|
@staticmethod
|
|
def get_domain(container: WhaleContainer):
|
|
domain = get_config('whale:trp_domain_suffix', '127.0.0.1.nip.io').lstrip('.')
|
|
domain = f'{container.uuid}.{domain}'
|
|
return domain
|
|
|
|
def access(self, container: WhaleContainer):
|
|
ch_type = container.challenge.redirect_type
|
|
domain = self.get_domain(container)
|
|
port = get_config('whale:trp_listening_port', 1443)
|
|
if ch_type == 'direct':
|
|
return f'from pwn import *<br>remote("{domain}", {port}, ssl=True).interactive()'
|
|
elif ch_type == 'http':
|
|
return f'https://{domain}' + (f':{port}' if port != 443 else '')
|
|
else:
|
|
return f'[ssl] {domain} {port}'
|
|
|
|
def register(self, container: WhaleContainer):
|
|
try:
|
|
resp = self.ses.post(f'{self.url}/rule/{self.get_domain(container)}', json={
|
|
'target': f'{container.user_id}-{container.uuid}:{container.challenge.redirect_port}',
|
|
'source': None,
|
|
})
|
|
resp.raise_for_status()
|
|
return True, 'success'
|
|
except HTTPError as e:
|
|
return False, e.response.text
|
|
except RequestException as e:
|
|
print(traceback.format_exc())
|
|
return False, 'unable to access trp Api'
|
|
|
|
def unregister(self, container: WhaleContainer):
|
|
try:
|
|
resp = self.ses.delete(f'{self.url}/rule/{self.get_domain(container)}')
|
|
resp.raise_for_status()
|
|
return True, 'success'
|
|
except HTTPError as e:
|
|
return False, e.response.text
|
|
except RequestException as e:
|
|
print(traceback.format_exc())
|
|
return False, 'unable to access trp Api'
|
|
|
|
def check_availability(self):
|
|
try:
|
|
resp = self.ses.get(f'{self.url}/rules').json()
|
|
except RequestException as e:
|
|
return False, 'Unable to access trp admin api'
|
|
except Exception as e:
|
|
return False, 'Unknown trp error'
|
|
return True, 'Available'
|