75d3bd6 by Anon Ray at 2014-02-19 1
# urlnorm.py - Normalize URLs
2
# Copyright (C) 2010 Kurt McKee <contactme@kurtmckee.org>
3
# 
4
# This program is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU Lesser General Public License as published by
6
# the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
# 
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Lesser General Public License for more details.
13
# 
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17
__author__ = "Kurt McKee <contactme@kurtmckee.org>"
18
19
import re
20
import urllib
21
import urlparse
22
23
DEFAULT_PORTS = {
24
    'http': u'80',
25
    'https': u'443',
26
}
27
28
NETLOC = re.compile("""
29
    ^
30
    (?:
31
        (?P<username>[^:@]+)?
32
        (?:
33
            :
34
            (?P<password>[^@]*)
35
        )?
36
        @
37
    )?
38
    (?P<hostname>[^:]+)
39
    (?:
40
        :
41
        (?P<port>[0-9]*)
42
    )?
43
    $
44
    """, re.VERBOSE,
45
)
46
47
PERCENT_ENCODING = re.compile("%([0-9a-f]{2})", re.IGNORECASE)
48
UNACCEPTABLE_QUERY_CHARS = re.compile("([^A-Za-z0-9_.~/-])")
49
50
# http://www.pc-help.org/obscure.htm
51
# http://www.securelist.com/en/blog/148/
52
# Translate the IP address from octal, decimal, and hex
53
# into a base 10 quadruple octet (like 127.0.0.1)
54
NUMERIC_IP = re.compile("""
55
    ^
56
    (?:
57
        (?P<o0>(?:[0-9]+)|(?:0x[0-9a-f]+))
58
        [.]
59
    )?
60
    (?:
61
        (?P<o1>(?:[0-9]+)|(?:0x[0-9a-f]+))
62
        [.]
63
    )?
64
    (?:
65
        (?P<o2>(?:[0-9]+)|(?:0x[0-9a-f]+))
66
        [.]
67
    )?
68
    (?P<o3>(?:[0-9]+)|(?:0x[0-9a-f]+))
69
    $
70
    """, re.VERBOSE | re.IGNORECASE
71
)
72
73
_pre_plugins = []
74
_post_plugins = []
75
76
def register_pre_plugin(fn):
77
    _pre_plugins.append(fn)
78
def register_post_plugin(fn):
79
    _post_plugins.append(fn)
80
81
def urlnorm(url, base=None):
82
    newurl = url.strip()
83
    newurl = ''.join((v for u in newurl.split('\n') for v in u.split('\r')))
84
    if newurl.lower().startswith('feed:'):
85
        newurl = newurl[5:]
86
    if base is not None:
87
        newurl = urlparse.urljoin(base.strip(), newurl)
88
    for fn in _pre_plugins:
89
        newurl = fn(newurl)
90
    newurl = _normalize_percent_encoding(newurl)
91
    parts = _urlparse(newurl)
92
    if parts is None:
93
        return url
94
    parts.update(_split_netloc(parts['netloc']))
95
    parts['scheme'] = _normalize_scheme(parts['scheme'])
96
    parts['port'] = _normalize_port(parts['port'], parts['scheme'])
97
    parts['path'] = _normalize_path(parts['path'])
98
    parts['hostname'] = _normalize_hostname(parts.get('hostname', ''))
99
    parts['query'] = _split_query(parts['query'])
100
    for fn in _post_plugins:
101
        parts.update(fn(parts))
102
    return _join_parts(parts)
103
104
def _urlparse(url):
105
    parts = dict(zip(('scheme', 'netloc', 'path', 'params', 'query', 'fragment'),
106
                     urlparse.urlparse(url)
107
                ))
108
    if (not parts['scheme'] and not parts['netloc']) or \
109
        (
110
            not parts['netloc'] and
111
            parts['path'] and
112
            parts['path'][0] in map(str, range(10)) and
113
            url.startswith('%s:%s' % (parts['scheme'], parts['path']))
114
        ):
115
        # url may not have included a scheme, like 'domain.example'
116
        # url may have been in the form 'domain.example:8080'
117
        parts = dict(zip(('scheme', 'netloc', 'path', 'params', 'query', 'fragment'),
118
                         urlparse.urlparse('http://%s' % url)
119
                    ))
120
    elif parts['scheme'].lower() not in ('http', 'https'):
121
        return None
122
    return parts
123
124
def _join_parts(parts):
125
    url = '%s://' % parts['scheme']
126
    if parts['username']:
127
        url += parts['username']
128
        if parts['password']:
129
            url += ':%s' % parts['password']
130
        url += '@'
131
    url += parts['hostname']
132
    if parts['port']:
133
        url += ':%s' % parts['port']
134
    url += parts['path']
135
    if parts['params']:
136
        url += ';%s' % parts['params']
137
    if parts['query']:
138
        url += '?%s' % _join_query(parts['query'])
139
    if parts['fragment']:
140
        url += '#%s' % parts['fragment']
141
    return url
142
143
def _split_netloc(netloc):
144
    parts_netloc = NETLOC.match(netloc)
145
    if parts_netloc is not None:
146
        return parts_netloc.groupdict()
147
    return {'username': '', 'password': '', 'hostname': '', 'port': ''}
148
149
def _normalize_scheme(scheme):
150
    return scheme.lower() or 'http'
151
152
def _normalize_port(port, scheme):
153
    if scheme in DEFAULT_PORTS and DEFAULT_PORTS[scheme] == port:
154
        return ''
155
    return port
156
157
def _normalize_percent_encoding(txt):
158
    unreserved = u'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~'
159
    def repl(hexpair):
160
        if unichr(int(hexpair.group(1), 16)) in unreserved:
161
            return unichr(int(hexpair.group(1), 16))
162
        return u'%%%s' % hexpair.group(1).upper()
163
    return re.sub(PERCENT_ENCODING, repl, txt)
164
165
def _normalize_hostname(hostname):
166
    hostname = hostname.lower()
167
    if hostname.endswith('.'):
168
        hostname = hostname[:-1]
169
    ip = NUMERIC_IP.match(hostname)
170
    if ip is not None:
171
        ip = filter(None, ip.groups())
172
        decimal_ip = 0
173
        for i in range(len(ip)):
174
            base = (10, 8, 16)[(ip[i][0:1] == '0') + (ip[i][1:2] == 'x')]
175
            decimal_ip += (
176
                (long(ip[i] or '0', base) &
177
                (256**[1, 4-i][len(ip)==i+1]-1)) <<
178
                (8*[3-i, 0][len(ip)==i+1])
179
            )
180
        new_ip = '.'.join([unicode((decimal_ip >> (8*octet)) & 255) for octet in (3, 2, 1, 0)])
181
        hostname = new_ip
182
    return hostname
183
184
def _normalize_path(path):
185
    path = path.split('/')
186
    endslash = False
187
    if path[-1] == '':
188
        endslash = True
189
    path = filter(None, path)
190
    pos = 0
191
    for i in range(len(path)):
192
        if path[i] == '.':
193
            path[i] = None
194
        elif path[i] == '..':
195
            path[pos] = None
196
            if pos > 0:
197
                pos -= 1
198
            path[i] = None
199
        elif path[i]:
200
            path[pos] = path[i]
201
            if pos < i:
202
                path[i] = None
203
            pos += 1
204
    path.insert(0, '')
205
    if endslash:
206
        path.append('')
207
    return '/'.join(filter(lambda x: x is not None, path)) or '/'
208
209
def _split_query(query):
210
    # The following code's basic logic was found in the Python 2.6
211
    # urlparse library, but was modified due to differing needs
212
    ret = {}
213
    queries = [j for i in query.split('&') for j in i.split(';')]
214
    if queries == ['']:
215
        return ret
216
    for q in queries:
217
        nv = q.split('=', 1)
218
        if len(nv) == 1:
219
            # Differentiate between `?n=` and ?n`
220
            nv.append(None)
221
        ret.setdefault(nv[0], []).append(nv[1])
222
    return ret
223
224
def _join_query(qdict):
225
    def replace(s):
226
        return u'%%%s' % hex(ord(s.group(1)))[2:].upper()
227
    ret = ''
228
    for k in sorted(qdict.keys()):
229
        for v in sorted(qdict[k]):
230
            if v is None:
231
                ret += '&%s' % (re.sub(UNACCEPTABLE_QUERY_CHARS, replace, k),)
232
            elif not v:
233
                ret += '&%s=' % (re.sub(UNACCEPTABLE_QUERY_CHARS, replace, k),)
234
            else:
235
                ret += '&%s=%s' % (re.sub(UNACCEPTABLE_QUERY_CHARS, replace, k),
236
                                   re.sub(UNACCEPTABLE_QUERY_CHARS, replace, v)
237
                                  )
238
    return ret[1:]