summaryrefslogtreecommitdiff
path: root/squaredeal/sqd.py
diff options
context:
space:
mode:
authoremkael <emkael@tlen.pl>2024-01-22 00:55:26 +0100
committeremkael <emkael@tlen.pl>2024-01-22 00:55:26 +0100
commitf135d7573e0416dfed94a31c432de907efa77644 (patch)
treeb288c20c5f7012df670db72dcf593378d3cf508c /squaredeal/sqd.py
parenta3a75ede6d5b1a471ae16465df48863fe579f840 (diff)
Renaming classes, part 1
Diffstat (limited to 'squaredeal/sqd.py')
-rw-r--r--squaredeal/sqd.py233
1 files changed, 233 insertions, 0 deletions
diff --git a/squaredeal/sqd.py b/squaredeal/sqd.py
new file mode 100644
index 0000000..a2768bb
--- /dev/null
+++ b/squaredeal/sqd.py
@@ -0,0 +1,233 @@
+import base64, hashlib, os, random, re, shutil, string, subprocess
+
+
+def generate_session_key():
+ return ''.join(random.choices(string.ascii_letters + string.digits, k=60))
+
+def parse_range_str(range_str, max_count):
+ range_start = 0
+ range_end = max_count
+ if range_str:
+ try:
+ range_start = int(range_str) - 1
+ range_end = range_start + 1
+ except ValueError:
+ range_match = re.match(r'([0-9]+)-([0-9]+)', range_str)
+ if range_match:
+ range_start = int(range_match.group(1))-1
+ range_end = int(range_match.group(2))
+ else:
+ raise ValueError('Invalid range string: %s' % (range_str))
+ if range_start < 0:
+ raise ValueError('Value out of range: 0')
+ if range_end > max_count:
+ raise ValueError('Value out of range: %d' % (range_end))
+ return range(range_start, range_end)
+
+
+def validate_board_range_str(range_str):
+ if range_str.isdigit():
+ return range_str
+ if re.match(r'^[0-9]+-[0-9]+$', range_str):
+ return range_str
+ raise ValueError('Invalid board range definition: %s' % (range_str))
+
+
+class SquareDealError(Exception):
+ pass
+
+
+class SquareDealPhase(object):
+ def __init__(self):
+ self.sessions = 0
+ self.boards = 0
+ self.prefix = '#'
+ self.info = ''
+ self.s_keys = []
+
+ def fromstring(self, phasestr):
+ parts = phasestr.split(':')
+ if len(parts) != 4:
+ raise SquareDealError('Malformed phase definition: %s' % (phasestr))
+ self.sessions = int(parts[0])
+ self.boards = parts[1]
+ self.prefix = parts[2]
+ self.info = parts[3]
+
+ def tostring(self):
+ return ':'.join([str(self.sessions), str(self.boards), self.prefix, self.info or ''])
+
+ def _output_file_name(self, session, reserve=False):
+ prefix = self.prefix
+ session_search = re.findall(r'#+', prefix)
+ for session_match in sorted(session_search, reverse=True):
+ session_str = ('%0'+str(len(session_match))+'d') % (session)
+ prefix = prefix.replace(session_match, session_str)
+ if reserve:
+ prefix += 'reserve'
+ return prefix
+
+ def _parse_board_ranges(self, range_def):
+ ranges = [range_str.strip() for range_str in range_def.split(',')]
+ for range_str in ranges:
+ validate_board_range_str(range_str)
+ output_ranges = []
+ while len(output_ranges) < self.sessions:
+ output_ranges += ranges
+ return output_ranges[0:self.sessions]
+
+ def generate(self, session, delayed_info, reserve=False, output_path=None):
+ if not SquareDeal.BIGDEALX_PATH:
+ raise SquareDealError('Path to BigDeal is not set, initialize SquareDeal.BIGDEALX_PATH value')
+ delayed_info = base64.b64encode(delayed_info.encode('utf-8')).decode()
+ sessions_to_generate = parse_range_str(session, self.sessions)
+ board_ranges = self._parse_board_ranges(self.boards)
+ for session in sessions_to_generate:
+ session_key = self.s_keys[session]
+ session_key_len = int(len(session_key)/2)
+ session_left = session_key[0:session_key_len]
+ session_right = session_key[session_key_len:]
+ reserve_info = 'reserve' if reserve else 'original'
+ args = [SquareDeal.BIGDEALX_PATH,
+ '-W', session_left,
+ '-e', session_right,
+ '-e', delayed_info,
+ '-e', reserve_info,
+ '-p', self._output_file_name(session+1, reserve),
+ '-n', board_ranges[session]]
+ try:
+ subprocess.run(args, cwd=output_path, capture_output=True, check=True)
+ except subprocess.CalledProcessError as ex:
+ raise SquareDealError(ex.stderr)
+
+
+
+class SquareDeal(object):
+
+ BIGDEALX_PATH=None
+
+ def __init__(self):
+ self.name = ''
+ self.delayed_info = ''
+ self.delayed_value = ''
+ self.hash = ''
+ self.phases = []
+ self.published = False
+ self.sqd_path = None
+
+ def fromfile(self, sqdpath, sqkpath=None, encoding='utf-8'):
+ with open(sqdpath, encoding=encoding) as sqdfile:
+ contents = [line.strip() for line in sqdfile.readlines()]
+ for idx, line in enumerate(contents):
+ if line.startswith('#'):
+ continue
+ linetype, _, linecontents = line.partition(' ')
+ if linetype == 'TN':
+ self.name = linecontents
+ elif linetype == 'DI':
+ self.delayed_info = linecontents
+ elif linetype == 'DV':
+ self.delayed_value = linecontents
+ elif linetype == 'KH':
+ self.hash = linecontents
+ self.published = True
+ elif linetype == 'SN':
+ phase = SquareDealPhase()
+ phase.fromstring(linecontents)
+ self.phases.append(phase)
+ else:
+ raise SquareDealError('Unrecognized tag %s on line %d' % (linetype, idx))
+ if self.published:
+ for phase in self.phases:
+ phase.s_keys = [None] * phase.sessions
+ if sqkpath is None:
+ sqkpath = self._deduce_sqk_path(sqdpath)
+ try:
+ with open(sqkpath, encoding=encoding) as sqkfile:
+ contents = [line.strip() for line in sqkfile.readlines()]
+ except FileNotFoundError:
+ raise SquareDealError('Unable to locate SQK file for %s' % (sqdpath))
+ for line in contents:
+ if not line.strip():
+ continue
+ lineparts = line.split(':')
+ if len(lineparts) != 2:
+ raise SquareDealError('Malformed SQK line: %s' % (line))
+ session = lineparts[0].split(',')
+ if len(session) != 2:
+ raise SquareDealError('Malformed SQK line: %s' % (line))
+ phase_no = int(session[0])
+ session_no = int(session[1])
+ try:
+ self.phases[phase_no-1].s_keys[session_no-1] = lineparts[1]
+ except IndexError:
+ raise SquareDealError(
+ 'Session %s from SQK not declared in SQD' % (lineparts[0]))
+ for ph_idx, phase in enumerate(self.phases):
+ for s_idx, s_key in enumerate(phase.s_keys):
+ if s_key is None:
+ raise SquareDealError(
+ 'Session %d,%d missing a key in SQK' % (ph_idx+1, s_idx+1))
+ sqk_hash = self._get_file_hash(sqkpath)
+ if sqk_hash != self.hash:
+ raise SquareDealError(
+ 'SQK hash mismtach: %s in SQD, % actual' % (self.hash, sqk_hash))
+ self.sqd_path = sqdpath
+
+ def _deduce_sqk_path(self, sqdpath):
+ sqkpath = list(os.path.splitext(sqdpath))
+ sqkpath[-1] = '.sqk'
+ return ''.join(sqkpath)
+
+ def _get_file_hash(self, path):
+ with open(path, 'rb') as hashed_file:
+ hash = hashlib.sha256()
+ while True:
+ chunk = hashed_file.read(1024)
+ if not chunk:
+ break
+ hash.update(chunk)
+ return hash.hexdigest()
+
+ def _make_backups(self, sqdpath, sqkpath):
+ for f in [sqdpath, sqkpath]:
+ if os.path.exists(f):
+ shutil.copy(f, f + '.bak')
+
+ def _write_session_keys(self, sqkpath):
+ with open(sqkpath, 'wb') as sqkfile:
+ for ph_idx, phase in enumerate(self.phases):
+ for s_idx, session_key in enumerate(phase.s_keys):
+ if session_key is None:
+ raise SquareDealError(
+ 'Missing session key for session %d,%d' % (ph_idx+1, s_idx+1))
+ sqkfile.write(
+ ('%d,%d:%s\r\n' % (ph_idx+1, s_idx+1, session_key)).encode('utf8'))
+ self.hash = self._get_file_hash(sqkpath)
+
+ def tofile(self, sqdpath, sqkpath=None, make_backups=True):
+ if sqkpath is None:
+ sqkpath = self._deduce_sqk_path(sqdpath)
+ if make_backups:
+ self._make_backups(sqdpath, sqkpath)
+ if self.published:
+ self._write_session_keys(sqkpath)
+ sqd_contents = []
+ sqd_contents.append('TN %s\n' % (self.name or ''))
+ sqd_contents.append('DI %s\n' % (self.delayed_info or ''))
+ if self.published:
+ sqd_contents.append('DV %s\n' % (self.delayed_value or ''))
+ for phase in self.phases:
+ sqd_contents.append('SN %s\n' % (phase.tostring()))
+ if self.published:
+ sqd_contents.append('KH %s\n' % (self.hash))
+ with open(sqdpath, 'w') as sqdfile:
+ sqdfile.writelines(sqd_contents)
+
+ def generate(self, phase, session, reserve=False):
+ phases_to_generate = parse_range_str(phase, len(self.phases))
+ for phase in phases_to_generate:
+ self.phases[phase].generate(
+ session, self.delayed_value,
+ reserve=reserve,
+ output_path=os.path.realpath(os.path.dirname(self.sqd_path)) if self.sqd_path else None)