Source code for pywind.ofgem.form_data

# coding=utf-8

# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.

# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# For more information, please refer to <http://unlicense.org/>

""" Each Ofgem web form contains a lot of information. Classes in this file try to
    make managing the data easier.
"""
from __future__ import print_function

import logging
import html5lib

import re
from pprint import pprint

import sys


[docs]def element_attributes(elm): """ Return a dict of the basic attributes we want from an XML element. """ return {'tag': elm.tag, 'type': elm.get('type'), 'name': elm.get('name'), 'value': elm.get('value', ''), 'readonly': elm.get('readonly', False), 'disabled': elm.get('disabled', False)}
[docs]def selected_list(element): """ Given an element dict, return a list of the indexes. """ if 'selected' in element: return element['selected'] return [int(idx.strip()) for idx in element['value'].split(',')]
[docs]def quote(toquote): """quote('abc def') -> 'abc%20def' Each part of a URL, e.g. the path info, the query, etc., has a different set of reserved characters that must be quoted. RFC 2396 Uniform Resource Identifiers (URI): Generic Syntax lists the following reserved characters. reserved = ";" | "/" | "?" | ":" | "@" | "&" | "=" | "+" | "$" | "," Each of these characters is reserved in some component of a URL, but not necessarily in all of them. By default, the quote function is intended for quoting the path section of a URL. Thus, it will not encode '/'. This character is reserved, but in typical usage the quote function is being called on a path where the existing slash characters are used as reserved characters. """ # fastpath if not toquote: if toquote is None: raise TypeError('None object cannot be quoted') return toquote always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' '0123456789_.-') quoted = [] for char in toquote: ooo = ord(char) if ooo < 128 and char in always_safe: quoted.append(char) elif ooo < 0x7f: quoted.append('%{:02X}'.format(ooo)) elif ooo < 0xbf: quoted.append('%C2%{:02X}'.format(ooo)) return ''.join(quoted)
[docs]class FormData(object): """ Class to store and allow easy manipulation of data from an Ofgem form.""" def __init__(self, initial_data="", stored_file=None): self.action = None self.method = None self.export_url = None self.logger = logging.getLogger(__name__) self.labels = {} self.elements = {} self.postbacks = {} self.seperators = {} if stored_file is not None: self.logger.debug("Initialising FormData from %s", stored_file) with open(stored_file, "r") as ofh: self._parse(ofh.read()) elif len(initial_data) > 0: self.logger.debug("%d bytes of initial data supplied for FormData", len(initial_data)) self._parse(initial_data) self._add_element('__EVENTARGUMENT', value='') self._add_element('__ASYNCPOST', value='true') self._add_element('__LASTFOCUS', value='') self._add_element('__EVENTTARGET', value='')
[docs] def update(self, content=""): """ Given some content, update the form. :param content: The content to update the form from :returns: True or False :rtype: bool """ content = content.strip() if sys.version_info >= (3,0): content = content.decode() if len(content) == 0: return False if '|' in content[:8]: # we have been sent a delta response... return self._parse_delta_content(content) return self._parse(content)
[docs] def set_value_by_label(self, lbl, value): """ Set a value based on a label. """ el_name = None for key in self.labels.keys(): if key.lower() == lbl.lower(): el_name = self.labels[key] break if el_name is None: self.logger.info("Unable to find label matching %s", lbl) return False, False self.logger.debug("Found label %s", lbl) return self._set_value_by_name(el_name, value)
[docs] def as_post_data(self, quoted=True, submit=False): """ Process the form elements and return in a dict suitable for using as POST data. :param quoted: If set the returned data will be fully quoted. :param submit: True only if this is a submission post. :returns: Dict of data to be posted as name: value pairs :rtype: dict """ post_data = {} for name in sorted(self.elements.keys()): if 'divDropDown' in name and 'HiddenIndices' not in name: continue element = self.elements[name] if submit is False and element.get('type', '') == 'submit': continue if 'cbNull' in name and element['checked'] is False: continue post_data[name] = self._get_post_value(name, element) # pprint(post_data) if quoted: return {quote(key): quote(post_data[key]) for key in sorted(post_data.keys())} return post_data
[docs] def value_for_label(self, lbl): if lbl not in self.labels: raise KeyError("Label {} does not exist".format(lbl)) return self._get_post_value(self.labels[lbl])
### Private functions below def __contains__(self, item): return item in self.elements def __setitem__(self, key, key_value): if not isinstance(key_value, dict): self._add_element(key, value=key_value) else: self._add_element(key, dict=key_value) def __getitem__(self, item): if item in self.elements: return self.elements[item] raise KeyError(item) def _add_element(self, el_name=None, **kwargs): """ Add an element to the class. This allows for direct setting by using passing in a keyword parameter of dict={...}. el_name is required unless a dict is passed that way. """ el_name = kwargs['name'] if el_name is None else el_name if 'dict' in kwargs: self.elements[el_name] = kwargs['dict'] return self.elements[el_name] = {key: kwargs[key] for key in kwargs} def _set_value_by_name(self, name, value): element = self.elements[name] if element['tag'] == 'select': sel = None if isinstance(value, int): if value not in element['options']: self.logger.info("Unable to set %s to %s [not in options]", name, value) pprint(element['options']) return False, False sel = value else: for opt in element['options'].keys(): if element['options'][opt].lower() == value.lower(): sel = opt break if sel is None: self.logger.info("Unable to find a matching option for %s", value) return False, False if sel in element['selected']: return True, False element['selected'] = [sel] return True, self._postback_needed(name) elif 'value' in element: element['value'] = value self._check_set_dropdown(name, value) if 'checkbox' in element and element['checkbox']: ckbox = self.elements[name.replace('txtValue', 'cbNull')] ckbox['checked'] = False return True, self._postback_needed(ckbox['name']) or self._postback_needed(name) # pprint(element) return True, self._postback_needed(name) return False, False def _check_set_dropdown(self, name, value): dd = name.replace('txtValue', 'divDropDown$ctl00') if dd not in self.elements: return options = [] idxs = [] idx_el = None for poss in self.elements: if dd[:-2] in poss: options.append(poss) if 'HiddenIndices' in poss: idx_el = poss if idx_el is None: return options = sorted(options) for n in range(len(options) - 2): poss_el = self.elements.get(options[n + 2]) if poss_el['label'] == value: idxs.append(str(n)) self.elements[idx_el]['value'] = ",".join(idxs) return True def _postback_needed(self, name): """ If a postback is needed, set things up and return True. """ if self.postbacks.get(name, False): self.elements['ScriptManager1'] = {'value': 'ScriptManager1|{}'.format(name)} self.elements['__EVENTTARGET'] = {'value': name} return True return False def _parse(self, content): document = html5lib.parse(content, treebuilder="lxml", namespaceHTMLElements=False) self._parse_scripts(document) forms = document.xpath('*//form[@id="form1"]') if len(forms) == 0: self.logger.info("No form with an id of 'form1' found in supplied data.") return False self._parse_form(forms[0]) return True def _parse_form(self, form_root): """ If we have a complete form, process it. """ self.action = form_root.get('action') self.method = form_root.get('method') self.logger.debug("Form: Action: %s", self.action) self.logger.debug(" : Method: %s", self.method) self._process_input(form_root) self._process_cbnull() self._process_select(form_root) self._process_labels(form_root) def _process_input(self, root): self.logger.debug("Processing INPUT elements...") for elm in root.xpath('.//input'): inp_data = element_attributes(elm) if inp_data['type'] in [None, 'image']: continue if 'cbNull' in inp_data['name']: inp_data['checked'] = elm.get('checked', '') == 'checked' if inp_data['type'] == 'radio': if elm.get('checked', '') != 'checked': continue self._add_element(None, **inp_data) self.logger.debug(" - adding %s", inp_data['name']) def _process_cbnull(self): for key in self.elements.keys(): if key.endswith('cbNull'): if key.replace('cbNull', 'txtValue') in self.elements: self.elements[key.replace('cbNull', 'txtValue')]['checkbox'] = True def _process_select(self, root): self.logger.debug("Processing SELECT elements...") for elm in root.xpath('.//select'): inp_data = element_attributes(elm) inp_data['selected'] = [] options = {} for opt in elm.iterchildren(): options[opt.get('value')] = opt.text.strip() if opt.get('selected', '') == 'selected': inp_data['selected'].append(opt.get('value')) inp_data['options'] = options self._add_element(None, **inp_data) self.logger.debug(" - adding %s with %d options", inp_data['name'], len(options)) def _process_labels(self, root): self.logger.debug("Processing labels...") for elm in root.xpath('*//tr/td//label'): if len(elm.getchildren()) == 0: txt = elm.text else: txt_nodes = elm.xpath('./span/font') if len(txt_nodes) == 0: continue txt = txt_nodes[0].text.strip().replace(':', '') txt = txt.strip()#.replace(u'\u00a0', 'A0') name = elm.get('for').replace('_', '$') if 'rbTrue' in name or 'rbFalse' in name: continue if 'txtValue' not in name and 'ddValue' not in name: elem = self.elements.get(name, None) if elem is None: self.logger.info("Unable to find an element to label : %s", name) continue elem['label'] = txt continue self.labels[txt] = name self.logger.debug(" - adding %s for %s", txt, name) def _parse_scripts(self, root): """ Look for callback information. """ for scr in root.xpath('*//script'): if scr.text is None or 'Sys.Application' not in scr.text: continue for jss in re.findall(r"Sys.Application.add_init\(function\(\) \{\n(.*)\n\}\);", scr.text): tid = re.search(r'\"(DropDownId|TextBoxId|FalseCheckId|NullCheckBoxId)\":\"(.*?)\",', jss) if tid is None: continue name = tid.group(2).replace('_', '$') if '"PostBackOnChange":true' in jss: self.postbacks[name] = True lss = re.search('\"ListSeparator\":\"(.*?)\",', jss) if lss is not None: self.seperators[name] = lss.group(1) def _parse_delta_content(self, content): """ Function to parse the "delta" content that is returned. Each is a series of 4 elements seperated by a pipe symbol. The elements of the change appear to be - number of bytes for the "payload" - what the change relates to - field name or additional content information - change payload. The first element appears to always be 1|#||4|. """ components = [] comp = [] consumed = 0 pos = 0 while pos < len(content): if content[pos] == '|': comp.append(content[consumed:pos]) consumed = pos + 1 if len(comp) == 3: self.logger.debug("%d: %s, %s, %s [@%d]", pos, comp[0], comp[1], comp[2], consumed) try: length = int(comp[0]) except ValueError: self.logger.warning("Inavlid length detected while parsing delta content :-(" "%s is not a valid length", comp[0]) # with open('delta.txt', 'w') as ofh: # ofh.write(content) return False comp.pop(0) if consumed + length > len(content): self.logger.info("Content buffer is not long enough") break val = content[consumed:consumed + length] consumed += length if content[consumed] != '|': self.logger.info("Length appears wrong... Found %s instead of |", content[consumed]) # Small fudge factor if required for poss in range(1, 10): if content[consumed + poss] == '|': val += content[consumed:consumed + poss] consumed += poss break if poss == 10: self.logger.warning("Unable to recover from invalid length. Exiting") return False self.logger.info("Length adjusted by %d bytes", poss) comp.append(val) components.append(comp) comp = [] consumed += 1 pos = consumed pos += 1 if components[0][0] != '#': self.logger.warning("Invalid delta response received.") return False if components[1][0] == 'pageRedirect': self.logger.info("Redirect received. Something went wrong :-(") return False self.logger.debug("Processing delta update with %s components", len(components)) for comp in components[1:]: if comp[0] == 'hiddenField': element = self.elements.get(comp[1], None) if element is None: self.elements[comp[1]] = {'value': comp[2]} self.logger.debug(" - created element %s", comp[1]) else: element['value'] = comp[2] self.logger.debug(" - updated value for %s", comp[1]) elif comp[0] == 'formAction': self.action = comp[2] self.logger.debug(" - updated action URL to %s", self.action) elif 'script' in comp[0]: if 'ExportUrlBase' in comp[2]: export_base = re.search('\"ExportUrlBase\":\"(.*?)\",', comp[2]) self.export_url = export_base.group(1).replace('\\u0026', '&') self.logger.debug(" - found export url: %s", self.export_url) return True def _get_post_value(self, name, element=None): if element is None: element = self.elements[name] if 'txtValue' in name: self.logger.debug("building string from selected options for %s", name) related = self._get_related_txt_element(name) if related is None: self.logger.info("Unable to find related select for %s", name) return '' return self._build_text_value(related, element) if 'ddValue' in name: return ",".join([str(idx) for idx in element['selected']]) if 'cbNull' in name: return 'on' if element['checked'] else '' if 'HiddenIndices' in name: return element['value'] return element['value'] def _get_related_txt_element(self, name): """ If we have a txtValue field then we need to find the element that contains the choices to build the required text. This could be a select element or one of the more complex multiple choice checkbox fields. """ # print("_get_related_txt_element - {}".format(name)) for poss in ['ddValue', 'cbNull', 'divDropDown$ctl01$HiddenIndices']: related = name.replace('txtValue', poss) rel_el = self.elements.get(related, None) # print(" {} => {}".format(related, rel_el)) if rel_el is not None: return rel_el return None def _build_text_value(self, element, original): if 'cbNull' in element['name']: if element['checked']: return '' return original['value'] sep = self.seperators.get(original['name'], ',') if element['tag'] == 'select': return sep.join([element['options'][idx] for idx in selected_list(element)]) components = [] if ',' in element['value']: for idx in element['value'].strip().split(','): num_idx = int(idx) + 2 val_name = element['name'].replace('01$HiddenIndices', "{:02d}".format(num_idx)) elem = self.elements.get(val_name, None) if elem is None: self.logger.info("Unable to find a text value for %s -> %s", idx, val_name) continue components.append(elem.get('label', 'unknown')) # print(" {}".format(components)) return sep.join(components)