diff options
author | emkael <emkael@tlen.pl> | 2024-01-22 00:55:26 +0100 |
---|---|---|
committer | emkael <emkael@tlen.pl> | 2024-01-22 00:55:26 +0100 |
commit | f135d7573e0416dfed94a31c432de907efa77644 (patch) | |
tree | b288c20c5f7012df670db72dcf593378d3cf508c /squaredeal/sqd.py | |
parent | a3a75ede6d5b1a471ae16465df48863fe579f840 (diff) |
Renaming classes, part 1
Diffstat (limited to 'squaredeal/sqd.py')
-rw-r--r-- | squaredeal/sqd.py | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/squaredeal/sqd.py b/squaredeal/sqd.py new file mode 100644 index 0000000..a2768bb --- /dev/null +++ b/squaredeal/sqd.py @@ -0,0 +1,233 @@ +import base64, hashlib, os, random, re, shutil, string, subprocess + + +def generate_session_key(): + return ''.join(random.choices(string.ascii_letters + string.digits, k=60)) + +def parse_range_str(range_str, max_count): + range_start = 0 + range_end = max_count + if range_str: + try: + range_start = int(range_str) - 1 + range_end = range_start + 1 + except ValueError: + range_match = re.match(r'([0-9]+)-([0-9]+)', range_str) + if range_match: + range_start = int(range_match.group(1))-1 + range_end = int(range_match.group(2)) + else: + raise ValueError('Invalid range string: %s' % (range_str)) + if range_start < 0: + raise ValueError('Value out of range: 0') + if range_end > max_count: + raise ValueError('Value out of range: %d' % (range_end)) + return range(range_start, range_end) + + +def validate_board_range_str(range_str): + if range_str.isdigit(): + return range_str + if re.match(r'^[0-9]+-[0-9]+$', range_str): + return range_str + raise ValueError('Invalid board range definition: %s' % (range_str)) + + +class SquareDealError(Exception): + pass + + +class SquareDealPhase(object): + def __init__(self): + self.sessions = 0 + self.boards = 0 + self.prefix = '#' + self.info = '' + self.s_keys = [] + + def fromstring(self, phasestr): + parts = phasestr.split(':') + if len(parts) != 4: + raise SquareDealError('Malformed phase definition: %s' % (phasestr)) + self.sessions = int(parts[0]) + self.boards = parts[1] + self.prefix = parts[2] + self.info = parts[3] + + def tostring(self): + return ':'.join([str(self.sessions), str(self.boards), self.prefix, self.info or '']) + + def _output_file_name(self, session, reserve=False): + prefix = self.prefix + session_search = re.findall(r'#+', prefix) + for session_match in sorted(session_search, reverse=True): + session_str = ('%0'+str(len(session_match))+'d') % (session) + prefix = prefix.replace(session_match, session_str) + if reserve: + prefix += 'reserve' + return prefix + + def _parse_board_ranges(self, range_def): + ranges = [range_str.strip() for range_str in range_def.split(',')] + for range_str in ranges: + validate_board_range_str(range_str) + output_ranges = [] + while len(output_ranges) < self.sessions: + output_ranges += ranges + return output_ranges[0:self.sessions] + + def generate(self, session, delayed_info, reserve=False, output_path=None): + if not SquareDeal.BIGDEALX_PATH: + raise SquareDealError('Path to BigDeal is not set, initialize SquareDeal.BIGDEALX_PATH value') + delayed_info = base64.b64encode(delayed_info.encode('utf-8')).decode() + sessions_to_generate = parse_range_str(session, self.sessions) + board_ranges = self._parse_board_ranges(self.boards) + for session in sessions_to_generate: + session_key = self.s_keys[session] + session_key_len = int(len(session_key)/2) + session_left = session_key[0:session_key_len] + session_right = session_key[session_key_len:] + reserve_info = 'reserve' if reserve else 'original' + args = [SquareDeal.BIGDEALX_PATH, + '-W', session_left, + '-e', session_right, + '-e', delayed_info, + '-e', reserve_info, + '-p', self._output_file_name(session+1, reserve), + '-n', board_ranges[session]] + try: + subprocess.run(args, cwd=output_path, capture_output=True, check=True) + except subprocess.CalledProcessError as ex: + raise SquareDealError(ex.stderr) + + + +class SquareDeal(object): + + BIGDEALX_PATH=None + + def __init__(self): + self.name = '' + self.delayed_info = '' + self.delayed_value = '' + self.hash = '' + self.phases = [] + self.published = False + self.sqd_path = None + + def fromfile(self, sqdpath, sqkpath=None, encoding='utf-8'): + with open(sqdpath, encoding=encoding) as sqdfile: + contents = [line.strip() for line in sqdfile.readlines()] + for idx, line in enumerate(contents): + if line.startswith('#'): + continue + linetype, _, linecontents = line.partition(' ') + if linetype == 'TN': + self.name = linecontents + elif linetype == 'DI': + self.delayed_info = linecontents + elif linetype == 'DV': + self.delayed_value = linecontents + elif linetype == 'KH': + self.hash = linecontents + self.published = True + elif linetype == 'SN': + phase = SquareDealPhase() + phase.fromstring(linecontents) + self.phases.append(phase) + else: + raise SquareDealError('Unrecognized tag %s on line %d' % (linetype, idx)) + if self.published: + for phase in self.phases: + phase.s_keys = [None] * phase.sessions + if sqkpath is None: + sqkpath = self._deduce_sqk_path(sqdpath) + try: + with open(sqkpath, encoding=encoding) as sqkfile: + contents = [line.strip() for line in sqkfile.readlines()] + except FileNotFoundError: + raise SquareDealError('Unable to locate SQK file for %s' % (sqdpath)) + for line in contents: + if not line.strip(): + continue + lineparts = line.split(':') + if len(lineparts) != 2: + raise SquareDealError('Malformed SQK line: %s' % (line)) + session = lineparts[0].split(',') + if len(session) != 2: + raise SquareDealError('Malformed SQK line: %s' % (line)) + phase_no = int(session[0]) + session_no = int(session[1]) + try: + self.phases[phase_no-1].s_keys[session_no-1] = lineparts[1] + except IndexError: + raise SquareDealError( + 'Session %s from SQK not declared in SQD' % (lineparts[0])) + for ph_idx, phase in enumerate(self.phases): + for s_idx, s_key in enumerate(phase.s_keys): + if s_key is None: + raise SquareDealError( + 'Session %d,%d missing a key in SQK' % (ph_idx+1, s_idx+1)) + sqk_hash = self._get_file_hash(sqkpath) + if sqk_hash != self.hash: + raise SquareDealError( + 'SQK hash mismtach: %s in SQD, % actual' % (self.hash, sqk_hash)) + self.sqd_path = sqdpath + + def _deduce_sqk_path(self, sqdpath): + sqkpath = list(os.path.splitext(sqdpath)) + sqkpath[-1] = '.sqk' + return ''.join(sqkpath) + + def _get_file_hash(self, path): + with open(path, 'rb') as hashed_file: + hash = hashlib.sha256() + while True: + chunk = hashed_file.read(1024) + if not chunk: + break + hash.update(chunk) + return hash.hexdigest() + + def _make_backups(self, sqdpath, sqkpath): + for f in [sqdpath, sqkpath]: + if os.path.exists(f): + shutil.copy(f, f + '.bak') + + def _write_session_keys(self, sqkpath): + with open(sqkpath, 'wb') as sqkfile: + for ph_idx, phase in enumerate(self.phases): + for s_idx, session_key in enumerate(phase.s_keys): + if session_key is None: + raise SquareDealError( + 'Missing session key for session %d,%d' % (ph_idx+1, s_idx+1)) + sqkfile.write( + ('%d,%d:%s\r\n' % (ph_idx+1, s_idx+1, session_key)).encode('utf8')) + self.hash = self._get_file_hash(sqkpath) + + def tofile(self, sqdpath, sqkpath=None, make_backups=True): + if sqkpath is None: + sqkpath = self._deduce_sqk_path(sqdpath) + if make_backups: + self._make_backups(sqdpath, sqkpath) + if self.published: + self._write_session_keys(sqkpath) + sqd_contents = [] + sqd_contents.append('TN %s\n' % (self.name or '')) + sqd_contents.append('DI %s\n' % (self.delayed_info or '')) + if self.published: + sqd_contents.append('DV %s\n' % (self.delayed_value or '')) + for phase in self.phases: + sqd_contents.append('SN %s\n' % (phase.tostring())) + if self.published: + sqd_contents.append('KH %s\n' % (self.hash)) + with open(sqdpath, 'w') as sqdfile: + sqdfile.writelines(sqd_contents) + + def generate(self, phase, session, reserve=False): + phases_to_generate = parse_range_str(phase, len(self.phases)) + for phase in phases_to_generate: + self.phases[phase].generate( + session, self.delayed_value, + reserve=reserve, + output_path=os.path.realpath(os.path.dirname(self.sqd_path)) if self.sqd_path else None) |