Changeset - 062b26d5bc51
[Not reviewed]
0 0 2
U-Travelm8\Кирилл - 10 months ago 2021-01-30 12:03:50
akirill94@gmail.com
Initial commit
2 files changed with 223 insertions and 0 deletions:
0 comments (0 inline, 0 general)
gemini_sgi.py
Show inline comments
 
new file 100755
 
import os
 
import sys
 
import mimetypes
 
import ssl
 
import traceback
 
import importlib
 
from urllib.parse import urlparse
 
from socketserver import ThreadingTCPServer, BaseRequestHandler
 

	
 
def default_application(environ, start_response):
 
    start_response(20, 'text/gemini')
 
    return b'It works!\n'
 

	
 
class GeminiRequestException(Exception):
 
    def __init__(self, message, code=50):
 
        super().__init__(message)
 
        self.code = code
 

	
 
class GeminiResponse(object):
 
    def __init__(self):
 
        self.content = None
 
        self.status = 20
 
        self.meta = None
 

	
 
class GeminiRequestHandler(BaseRequestHandler):
 

	
 
    def handle(self):
 
        req = self.request.read(1024)
 
        request_line = req.decode('utf-8').rstrip('\r\n')
 
        urlparts = urlparse(request_line)
 
        print('[info] {}:{} {} {}'.format(*self.client_address, urlparts.netloc, urlparts.path))
 
        try:
 
            if urlparts.scheme != 'gemini':
 
                raise GeminiRequestException('Inappropriate protocol', 50)
 

	
 
            response = GeminiResponse()
 
            environ = {
 
                'wsgi.version': (0, 1),
 
                'wsgi.urlscheme': urlparts.scheme,
 
                'REQUEST_METHOD': 'GET',
 
                'SCRIPT_NAME': '',
 
                'QUERY_STRING': urlparts.query,
 
                'PATH_INFO': urlparts.path,
 
                'SERVER_NAME': urlparts.hostname,
 
                'SERVER_PORT': urlparts.port or '1965',
 
                'SERVER_PROTOCOL': 'GEMINI',
 
                'REMOTE_ADDR': self.client_address[0],
 
                'REMOTE_PORT': str(self.client_address[1])
 
            }
 
            def start_response(status, meta):
 
                response.status = status
 
                response.meta = meta
 
            content = self.server.wsgi_app(environ, start_response)
 
            self.request.sendall('{} {}\r\n'.format(response.status, response.meta).encode('utf-8'))
 
            if content:
 
                self.request.sendall(content)
 
        except GeminiRequestException as e:
 
            self.request.sendall('{} {}\r\n'.format(e.code, str(e)).encode('utf-8'))
 
        except Exception as e:
 
            print(traceback.format_exc())
 
            self.request.sendall(b'50 Internal Server Error\r\n')
 

	
 
class GeminiWSGIServer(ThreadingTCPServer):
 
    def __init__(self, host, port, certfile, keyfile, wsgi_app):
 
        super().__init__((host, port), GeminiRequestHandler)
 
        self.socket = ssl.wrap_socket(self.socket, certfile=certfile, keyfile=keyfile, server_side=True)
 
        self.allow_reuse_address = True
 
        self.wsgi_app = wsgi_app
 
        print('Serving Gemini on {} port {} ...'.format(host, port))
 

	
 
def main(arguments):
 
    configuration = {
 
        'host': 'localhost',
 
        'port': 1965,
 
        'certificate_file': None,
 
        'key_file': None,
 
        'application': default_application
 
    }
 
    while len(arguments) > 0:
 
        option = arguments.pop(0)
 
        if option == '-h' or option == '--host':
 
            configuration['host'] = arguments.pop(0)
 
        elif option == '-p' or option == '--port':
 
            configuration['port'] = int(arguments.pop(0))
 
        elif option == '-c' or option == '--cert':
 
            configuration['certificate_file'] = arguments.pop(0)
 
        elif option == '-k' or option == '--certkey':
 
            configuration['key_file'] = arguments.pop(0)
 
        elif option == '-a' or option == '--app':
 
            configuration['application'] = importlib.import_module(arguments.pop(0)).application
 
    server = GeminiWSGIServer(
 
        configuration['host'],
 
        configuration['port'],
 
        configuration['certificate_file'],
 
        configuration['key_file'],
 
        configuration['application']
 
    )
 
    server.serve_forever()
 

	
 
if __name__ == '__main__':
 
    main(sys.argv[1:])
 
\ No newline at end of file
serpens.py
Show inline comments
 
new file 100755
 
import re
 
 
KEY_TYPES = {
 
    'any': '(?P<{}>.+?)',
 
    'str': '(?P<{}>[^/]+?)'
 
}
 
 
def parse_route(pattern):
 
    output = ''
 
    token_start = 0
 
    current = 0
 
    mode = 0
 
    keys = []
 
    while current < len(pattern):
 
        if mode == 0:
 
            if pattern[current] == '{':
 
                output += re.escape(pattern[token_start:current])
 
                token_start = None
 
                mode = 1
 
            else:
 
                if token_start is None:
 
                    token_start = current
 
        else:
 
            if pattern[current] == '{':
 
                raise ValueError('Nested braces aren\'t allowed')
 
            elif pattern[current] == '}':
 
                key = pattern[token_start:current]
 
                if key.find(':') != -1:
 
                    key_type, key = key.split(':')
 
                    output += KEY_TYPES[key_type].format(key)
 
                else:
 
                    output += KEY_TYPES['str'].format(key)
 
                keys.append(key)
 
                mode = 0
 
                token_start = None
 
            else:
 
                if token_start is None:
 
                    token_start = current
 
        current += 1
 
    if mode == 1:
 
        raise ValueError('Braces aren\'t closed')
 
    if token_start is not None:
 
        output += re.escape(pattern[token_start:])
 
    return '^{}$'.format(output), keys
 
 
class SerpensResponse(object):
 
    __slots__ = ['content', 'status', 'mediatype', 'language', 'charset']
 
    def __init__(self, content, status, mediatype, language=None, charset=None):
 
        self.content = content
 
        self.status = status
 
        self.mediatype = mediatype
 
        self.language = language
 
        self.charset = charset
 
 
    @property
 
    def meta(self):
 
        meta_line = self.mediatype
 
        if self.language:
 
            meta_line += '; lang=' + self.language
 
        if self.charset:
 
            meta_line += '; charset=' + self.charset
 
 
class SerpensRequest(object):
 
    def __init__(self, query, path, remote_addr, hostname, port):
 
        self.query = query
 
        self.path = path
 
        self.remote_addr = remote_addr
 
        self.hostname = hostname
 
        self.port = port
 
 
class Serpens(object):
 
    def __init__(self):
 
        self.routes = []
 
 
    def route(self, rule):
 
        def wrapper(callback):
 
            regex, keys = parse_route(rule)
 
            self.routes.append((re.compile(regex), keys, callback))
 
            return callback
 
        return wrapper
 
 
    def dispatch(self, request, path):
 
        for rule, keys, callback in self.routes:
 
            m = rule.match(path)
 
            if m:
 
                kwargs = {item: m.group(item) for item in keys}
 
                return callback(request, **kwargs)
 
        return None
 
 
    def wsgi(self, environ, start_response):
 
        request = SerpensRequest(
 
            environ['QUERY_STRING'],
 
            environ['PATH_INFO'],
 
            environ['REMOTE_ADDR'],
 
            environ['SERVER_NAME'],
 
            environ['SERVER_PORT']
 
        )
 
        response = self.dispatch(request, environ['PATH_INFO'])
 
        if response is None:
 
            start_response(51, 'Not found')
 
            return
 
        elif isinstance(response, str):
 
            start_response(20, 'text/gemini')
 
            return response.encode('utf-8')
 
        elif isinstance(response, bytes):
 
            start_response(20, 'application/octet-stream')
 
            return response.encode('utf-8')
 
        elif isinstance(response, SerpensResponse):
 
            start_response(response.status, response.meta)
 
            return response.content
 
        else:
 
            start_response(42, 'Internal Error')
 
            return
 
 
def redirect(url):
 
    return SerpensResponse(None, 30, url)
 
 
def error(message, status=40):
 
    return SerpensResponse(None, status, message)
 
 
def input_required(prompt):
 
    return SerpensResponse(None, 10, prompt)
 
\ No newline at end of file
0 comments (0 inline, 0 general)