diff options
Diffstat (limited to 'src/bcdd')
-rw-r--r-- | src/bcdd/BCalcWrapper.py | 45 | ||||
-rw-r--r-- | src/bcdd/DDTable.py | 98 | ||||
-rw-r--r-- | src/bcdd/Exceptions.py | 18 | ||||
-rw-r--r-- | src/bcdd/PBNBoard.py | 242 | ||||
-rw-r--r-- | src/bcdd/PBNFile.py | 43 | ||||
-rw-r--r-- | src/bcdd/ParContract.py | 155 | ||||
-rw-r--r-- | src/bcdd/ParScore.py | 171 | ||||
-rw-r--r-- | src/bcdd/__init__.py | 0 |
8 files changed, 772 insertions, 0 deletions
diff --git a/src/bcdd/BCalcWrapper.py b/src/bcdd/BCalcWrapper.py new file mode 100644 index 0000000..a6bed27 --- /dev/null +++ b/src/bcdd/BCalcWrapper.py @@ -0,0 +1,45 @@ +''' +Wrapper class for libbcalcDDS.dll +''' + +import os +from ctypes import cdll, c_char_p, c_void_p + +from .Exceptions import DllNotFoundException + + +class BCalcWrapper(object): + + DENOMINATIONS = [ 'C', 'D', 'H', 'S', 'N' ] + PLAYERS = [ 'N', 'E', 'S', 'W' ] + + def __init__(self): + dllPath = os.path.join( + os.path.dirname(__file__), + '..') + try: + self.libbcalcdds = cdll.LoadLibrary(os.path.join( + dllPath, 'libbcalcdds.dll')) + except OSError: + try: + self.libbcalcdds = cdll.LoadLibrary(os.path.join( + dllPath, 'libbcalcdds.so')) + except OSError: + self.libbcalcdds = None + if self.libbcalcdds is None: + raise DllNotFoundException() + + def __getattr__(self, attrname): + def _dynamic_method(*args): + function = getattr(self.libbcalcdds, 'bcalcDDS_' + attrname) + if attrname == 'new': + function.restype = c_void_p + else: + if attrname == 'getLastError': + function.restype = c_char_p + function.argtypes = [c_void_p] + return function(*args) + return _dynamic_method + + def declarerToLeader(self, player): + return (player + 1) % 4 diff --git a/src/bcdd/DDTable.py b/src/bcdd/DDTable.py new file mode 100644 index 0000000..651aec3 --- /dev/null +++ b/src/bcdd/DDTable.py @@ -0,0 +1,98 @@ +from ctypes import c_char_p +from copy import copy + +from .BCalcWrapper import BCalcWrapper +from .Exceptions import DDTableInvalidException, FieldNotFoundException + + +class DDTable(object): + + def _get_empty_table(self): + table = [] + row = [-1] * 5 + for col in range(0, 4): + table.append(copy(row)) + return table + + def _validate_table(self, table): + for row in table: + for t in row: + if (t > 13) or (t < 0): + raise DDTableInvalidException( + 'Invalid number of tricks: %d' % (t) + ) + return table + + _banner_displayed = False + + def __init__(self, board): + self._board = board; + self._wrapper = BCalcWrapper() + + def _check_for_error(self, solver): + error = self._wrapper.getLastError(solver) + if error: + raise DDTableInvalidException( + 'BCalc error: %s' % (c_char_p(error).value.decode('ascii'))) + + def get_bcalc_table(self, show_banner=True): + if not DDTable._banner_displayed and show_banner: + print('Double dummy analysis provided by BCalc.') + print('BCalc is awesome, check it out: http://bcalc.w8.pl') + DDTable._banner_displayed = True + result = self._get_empty_table() + deal = self._board.get_layout() + solver = self._wrapper.new(b"PBN", deal.encode(), 0, 0) + self._check_for_error(solver) + for denom in range(0, 5): + self._wrapper.setTrumpAndReset(solver, denom) + for player in range(0, 4): + leader = self._wrapper.declarerToLeader(player) + self._wrapper.setPlayerOnLeadAndReset(solver, leader) + result[player][denom] = 13 - self._wrapper.getTricksToTake( + solver) + self._check_for_error(solver) + self._wrapper.delete(solver); + return self._validate_table(result) + + def get_jfr_table(self): + result = self._get_empty_table() + ability = self._board.get_ability() + abilities = self._board.validate_ability(ability) + for player_ability in abilities: + player = player_ability[0] + player_id = BCalcWrapper.PLAYERS.index(player) + denom_id = 4 + for tricks in player_ability[1]: + result[player_id][denom_id] = int(tricks, 16) + denom_id -= 1 + return self._validate_table(result) + + def get_pbn_table(self): + table = self._board.get_optimum_result_table() + parsed_table = self._board.validate_optimum_result_table(table) + result = self._get_empty_table() + for line_match in parsed_table: + player = line_match.group(1)[0] + denom = line_match.group(2)[0] + tricks = int(line_match.group(3)) + player_id = BCalcWrapper.PLAYERS.index(player) + denom_id = BCalcWrapper.DENOMINATIONS.index(denom) + result[player_id][denom_id] = tricks + return self._validate_table(result) + + def get_dd_table(self, show_banner=True): + try: + return self.get_jfr_table() + except FieldNotFoundException: + try: + return self.get_pbn_table() + except FieldNotFoundException: + return self.get_bcalc_table(show_banner) + + def print_table(self, dd_table): + print('\t' + '\t'.join(BCalcWrapper.DENOMINATIONS)) + for i in range(0, 4): + print('%s%s' % ( + self._wrapper.PLAYERS[i], + ''.join(['\t' + str(tricks) for tricks in dd_table[i]]))) diff --git a/src/bcdd/Exceptions.py b/src/bcdd/Exceptions.py new file mode 100644 index 0000000..fd81dd7 --- /dev/null +++ b/src/bcdd/Exceptions.py @@ -0,0 +1,18 @@ +class FileNotFoundError(Exception): + pass + + +class DllNotFoundException(Exception): + pass + + +class FieldNotFoundException(Exception): + pass + + +class ParScoreInvalidException(FieldNotFoundException): + pass + + +class DDTableInvalidException(FieldNotFoundException): + pass diff --git a/src/bcdd/PBNBoard.py b/src/bcdd/PBNBoard.py new file mode 100644 index 0000000..87105f2 --- /dev/null +++ b/src/bcdd/PBNBoard.py @@ -0,0 +1,242 @@ +import re + +from .BCalcWrapper import BCalcWrapper +from .Exceptions import FieldNotFoundException, DDTableInvalidException + + +class PBNField(object): + + def __init__(self, key=None, value=None, raw_data=None): + self.key = key + self.value = value + self.raw_field = '[%s "%s"]' % (self.key, str(self.value)) \ + if self.key is not None else raw_data + +class PBNBoard(object): + + line_pattern = re.compile(r'\[(.*) "(.*)"\]') + ability_pattern = re.compile(r'\b([NESW]):([0-9A-D]{5})\b') + optimum_result_table_pattern = re.compile( + r'^([NESW])\s+([CDHSN])T?\s+(\d+)$') + + def __init__(self, lines, line_no=None): + self.line_number = line_no + self._has_optimum_result_table = None + self._has_ability = None + self.fields = [] + for line in lines: + field = PBNField(raw_data=line) + line_parse = self.line_pattern.match(line) + if line_parse: + field.key = line_parse.group(1) + field.value = line_parse.group(2) + self.fields.append(field) + + def has_field(self, key): + for field in self.fields: + if key == field.key: + return True + return False + + def get_field(self, key): + for field in self.fields: + if key == field.key: + return field.value + raise FieldNotFoundException(key + ' field not found') + + def delete_field(self, key): + to_remove = [] + for field in self.fields: + if key == field.key: + to_remove.append(field) + for remove in to_remove: + self.fields.remove(remove) + + def get_event(self): + return self.get_field('Event') + + def write_event(self, name): + for i in range(0, len(self.fields)): + if 'Board' == self.fields[i].key: + self.fields.insert(i, PBNField(key='Event', value=name)) + break + + def get_layout(self): + return self.get_field('Deal') + + def get_number(self): + return self.get_field('Board') + + def get_vulnerable(self): + return self.get_field('Vulnerable') + + def get_dealer(self): + return self.get_field('Dealer') + + def validate_ability(self, ability): + matches = self.ability_pattern.findall(ability) + if not len(matches): + self._has_ability = False + raise DDTableInvalidException('Invalid Ability line: ' + ability) + players = [] + for match in matches: + if match[0] in players: + self._has_ability = False + raise DDTableInvalidException( + 'Duplicate entry in Ability: ' + match[0]) + else: + players.append(match[1]) + self._has_ability = False + return matches + + def get_ability(self): + return self.get_field('Ability') + + def delete_ability(self): + self.delete_field('Ability') + + def write_ability(self, dd_table): + sb = '' + for i in range(0, 4): + sb += BCalcWrapper.PLAYERS[i] + sb += ':' + sb += ''.join(['%X' % (j) for j in dd_table[i]])[::-1] + sb += ' ' + self.fields.append(PBNField(key='Ability', value=sb.strip())) + + def get_minimax(self): + return self.get_field('Minimax') + + def delete_minimax(self): + self.delete_field('Minimax') + + def write_minimax(self, contract): + minimax = '7NS0' if contract.score == 0 \ + else '%d%s%s%s%d' % ( + contract.level, + contract.denomination, + 'D' if contract.doubled else '', + contract.declarer, + contract.score) + self.fields.append(PBNField(key='Minimax', value=minimax)) + + def get_optimum_score(self): + return self.get_field('OptimumScore') + + def delete_optimum_score(self): + self.delete_field('OptimumScore') + + def write_optimum_score(self, contract): + self.fields.append( + PBNField(key='OptimumScore', + value='NS %d' % (contract.score))) + + def get_optimum_result(self): + return self.get_field('OptimumResult') + + def validate_optimum_result_table(self, table): + matches = [] + duplicates = [] + for line in table: + match = self.optimum_result_table_pattern.match(line) + if not match: + self._has_optimum_result_table = False + raise DDTableInvalidException( + 'Invalid OptimumResultTable line: ' + line) + position = match.group(1) + ' - ' + match.group(2) + if position in duplicates: + self._has_optimum_result_table = False + raise DDTableInvalidException( + 'Duplicate OptimumResultTable line: ' + line) + else: + duplicates.append(position) + matches.append(match) + self._has_optimum_result_table = True + return matches + + def get_optimum_result_table(self): + try: + return self.get_extended_raw_field('OptimumResultTable') + except FieldNotFoundException: + self._has_optimum_result_table = False + raise + + def get_auction(self): + return self.get_extended_raw_field('Auction', with_header=True) + + def get_extended_raw_field(self, field_name, with_header=False): + field_found = False + result = [] + for field in self.fields: + if field_name == field.key: + field_found = True + if with_header: + result.append(field.value) + else: + if field_found: + if field.key is None: + result.append(field.raw_field) + else: + break + if not field_found: + raise FieldNotFoundException('%s field not found' % (field_name)) + return result + + def delete_optimum_result_table(self): + field_found = False + to_remove = [] + for field in self.fields: + if 'OptimumResultTable' == field.key: + field_found = True + to_remove.append(field) + else: + if field_found: + if field.key is None: + to_remove.append(field) + else: + break + for remove in to_remove: + self.fields.remove(remove) + + def write_optimum_result_table(self, dd_table): + self.fields.append(PBNField( + key='OptimumResultTable', + value=r'Declarer;Denomination\2R;Result\2R')) + for i in range(0, 4): + for j in range(0, 5): + self.fields.append(PBNField( + raw_data='%s %s%s %d' % ( + BCalcWrapper.PLAYERS[i], + BCalcWrapper.DENOMINATIONS[j], + 'T' if BCalcWrapper.DENOMINATIONS[j] == 'N' else '', + dd_table[i][j]))) + + def save_par_contract(self, contract, jfr_only=False): + if not jfr_only: + self.delete_optimum_score() + self.write_optimum_score(contract) + self.delete_minimax() + self.write_minimax(contract) + + def save_dd_table(self, dd_table, jfr_only=False): + if not jfr_only: + if self._has_optimum_result_table is None: + try: + optimum_result_table = self.validate_optimum_result_table( + self.get_optimum_result_table()) + self._has_optimum_result_table = True + except FieldNotFoundException: + self._has_optimum_result_table = False + if not self._has_optimum_result_table: + self.delete_optimum_result_table() + self.write_optimum_result_table(dd_table) + if self._has_ability is None: + try: + ability = self.validate_ability( + self.get_ability()) + self._has_ability = True + except FieldNotFoundException: + self._has_ability = False + if not self._has_ability: + self.delete_ability() + self.write_ability(dd_table) diff --git a/src/bcdd/PBNFile.py b/src/bcdd/PBNFile.py new file mode 100644 index 0000000..f5436ad --- /dev/null +++ b/src/bcdd/PBNFile.py @@ -0,0 +1,43 @@ +import shutil +import tempfile + +from .PBNBoard import PBNBoard + +class PBNFile(object): + + def __init__(self, filename): + self._filename = filename + self.output_file = None + self.boards = [] + lines = [] + with open(self._filename) as pbn_file: + contents = pbn_file.readlines() + first_line = 1 + for line_no in range(0, len(contents)): + line = contents[line_no].strip() + if not line: + if len(lines) > 0: + self.boards.append(PBNBoard(lines, first_line)) + lines = [] + first_line = line_no + 2 + else: + lines.append(line) + if len(lines) > 0: + self.boards.append(PBNBoard(lines, first_line)) + if not self.boards[0].has_field('Event'): + self.boards[0].write_event('') + + def write_board(self, board): + if self.output_file is None: + self.output_file = tempfile.NamedTemporaryFile( + mode='w', delete=False) + for field in board.fields: + self.output_file.write(field.raw_field + '\r\n') + self.output_file.write('\r\n') + + def save(self): + if self.output_file is None: + raise IOError('No boards written to PBN file, unable to save it.') + tmp_path = self.output_file.name + self.output_file.close() + shutil.move(tmp_path, self._filename) diff --git a/src/bcdd/ParContract.py b/src/bcdd/ParContract.py new file mode 100644 index 0000000..918efae --- /dev/null +++ b/src/bcdd/ParContract.py @@ -0,0 +1,155 @@ +import functools + +from .BCalcWrapper import BCalcWrapper as bcw +from .Exceptions import ParScoreInvalidException + + +class ParContract(object): + def __init__(self, level=0, denom='', declarer='', doubled=False, score=0): + self.level = level + self.denomination = denom + self.declarer = declarer + self.doubled = doubled + self.score = score + + def validate(self): + if self.score == 0: + return self + if (self.level < 1) or (self.level > 7): + raise ParScoreInvalidException( + 'Invalid par contract level: %d' % (self.level)) + if self.denomination not in 'CDHSN': + raise ParScoreInvalidException( + 'Invalid par contract denomination: ' + self.denomination) + if self.declarer not in 'NESW': + raise ParScoreInvalidException( + 'Invalid par contract declarer: ' + self.declarer) + return self + + def __repr__(self): + if self.score == 0: + return 'PASS' + return '%d%s%s %s %+d' % ( + self.level, + self.denomination, + 'x' if self.doubled else '', + self.declarer, + self.score) + + def __eq__(self, other): + return hash(self) == hash(other) + + def __hash__(self): + return self.score + self.level + 10000 * ( + ord(self.denomination[0]) if self.denomination else 0) + + def calculate_score(self, tricks, vulnerable=False): + if self.level == 0: + return 0 + score = 0 + if self.level + 6 > tricks: + undertricks = self.level + 6 - tricks + if self.doubled: + while True: + if undertricks == 1: + # first undertrick: 100 non-vul, 200 vul + score -= 200 if vulnerable else 100 + else: + if (undertricks <= 3) and not vulnerable: + # second non-vul undertrick: 200 + score -= 200 + else: + # further undertricks: 300 + score -= 300 + undertricks -= 1 + if undertricks == 0: + break; + else: + score = -100 if vulnerable else -50 + score *= undertricks + else: + par_tricks = self.level + while True: + if (self.denomination == 'N') and (par_tricks == 1): + # first non-trump trick: 40 + score += 40 + else: + # other tricks + score += 30 if self.denomination in 'NSH' else 20 + par_tricks -= 1 + if par_tricks == 0: + break + overtricks = tricks - self.level - 6 + if self.doubled: + score *= 2 + score += 50 + # game premium + score += (500 if vulnerable else 300) if (score >= 100) else 50 + if self.doubled: + score += overtricks * (200 if vulnerable else 100) + else: + score += overtricks * (20 if self.denomination in 'CD' else 30) + if self.level == 7: + # grand slam premium + score += 1500 if vulnerable else 1000 + elif self.level == 6: + # small slam premium + score += 750 if vulnerable else 500 + if self.declarer in 'EW': + score = -score + return score + + def __gt__(self, other): + denomination = bcw.DENOMINATIONS.index(self.denomination) \ + if self.denomination in bcw.DENOMINATIONS \ + else -1 + other_denomination = bcw.DENOMINATIONS.index( + other.denomination) \ + if other.denomination in bcw.DENOMINATIONS else -1 + return (self.level > other.level) \ + or ((self.level == other.level) \ + and (denomination > other_denomination)) + + def get_defense(self, dd_table, vulnerable): + declarer_index = bcw.PLAYERS.index(self.declarer) \ + if self.declarer in bcw.PLAYERS else -1 + denomination_index = bcw.DENOMINATIONS.index(self.denomination) \ + if self.denomination in bcw.DENOMINATIONS else -1 + if (self.level != 0) \ + and (self.level + 6 + <= dd_table[declarer_index][denomination_index]): + defenders_indexes = [] + defenders_indexes.append((declarer_index + 1) % 4); + defenders_indexes.append((declarer_index + 3) % 4); + possible_defense = [] + score_squared = self.score * self.score + for i in range(0, 5): + level = self.level + if i <= denomination_index: + level += 1 + if level <= 7: + for defender in defenders_indexes: + if level + 6 > dd_table[defender][i]: + defense = ParContract( + level, + bcw.DENOMINATIONS[i], + bcw.PLAYERS[defender], + True, + 0) + defense.score = defense.calculate_score( + dd_table[defender][i], + vulnerable) + if score_squared > self.score * defense.score: + possible_defense.append(defense) + if possible_defense: + possible_defense.sort( + key=lambda x: abs(x.score - self.score)) + optimum_defense = possible_defense[-1] + possible_defense = [defense for defense in possible_defense + if defense.score == optimum_defense.score] + for defense in possible_defense: + # Lowest from the most profitable sacrifices + if optimum_defense > defense: + optimum_defense = defense + return optimum_defense + return None diff --git a/src/bcdd/ParScore.py b/src/bcdd/ParScore.py new file mode 100644 index 0000000..e23a839 --- /dev/null +++ b/src/bcdd/ParScore.py @@ -0,0 +1,171 @@ +import re + +from .BCalcWrapper import BCalcWrapper as bcw +from .ParContract import ParContract +from .Exceptions import FieldNotFoundException + + +class ParScore(object): + _pbn_contract_pattern = re.compile(r'(\d)([CDHSN])(X?)\s+([NESW])') + _pbn_score_pattern = re.compile(r'(NS|EW)\s+(-?\d})') + _jfr_contract_pattern = re.compile(r'^(\d)([CDHSN])(D?)([NESW])(-?\d+)$') + + def __init__(self, board): + self._board = board + + def get_pbn_par_contract(self): + contract_field = self._board.get_optimum_result() + if 'Pass' == contract_field: + return ParContract() + contract_match = self._pbn_contract_pattern.match(contract_field) + if not contract_match: + raise ParScoreInvalidException( + 'Invalid format for OptimumResult field: ' + contract_field) + score_field = self._board.get_optimum_score() + score_match = self._pbn_score_pattern.match(score_field) + if not score_match: + raise ParScoreInvalidException( + 'Invalid format for OptimumScore field: ' + scoreField) + score = int(score_match.group(2)) + if 'EW' == score_match.group(1): + score = -score + contract = ParContract( + int(contract_match.group(1)), + contract_match.group(2)[0], + contract_match.group(4)[0], + 'X' == contract_match.group(3), + score) + return contract.validate() + + def get_jfr_par_contract(self): + par_string = self._board.get_minimax() + par_match = self._jfr_contract_pattern.match(par_string) + if not par_match: + raise ParScoreInvalidException( + 'Invalid format for Minimax field: ' + par_string) + if '0' == par_match.group(4): + return ParContract() # pass-out + contract = ParContract( + int(par_match.group(1)), + par_match.group(2)[0], + par_match.group(4)[0], + 'D' == par_match.group(3), + int(par_match.group(5))) + return contract.validate() + + def _determine_vulnerability(self, vulnerability, declarer): + vulnerability = vulnerability.upper() + return vulnerability in ['ALL', 'BOTH'] \ + or (vulnerability not in ['LOVE', 'NONE'] \ + and declarer in vulnerability) + + def _get_highest_makeable_contract(self, dd_table, + for_ns=True, for_ew=True): + contract = ParContract() + tricks = 0 + for i in range(3, -1, -1): + if ((i % 2 == 0) and for_ns) \ + or ((i % 2 == 1) and for_ew): + for j in range(0, 5): + level = dd_table[i][j] - 6 + denomination = bcw.DENOMINATIONS.index( + contract.denomination) \ + if contract.denomination in bcw.DENOMINATIONS \ + else -1 + if (level > contract.level) \ + or ((level == contract.level) \ + and (j > denomination)): + contract.level = level + contract.denomination = bcw.DENOMINATIONS[j] + contract.declarer = bcw.PLAYERS[i] + tricks = dd_table[i][j] + vulnerability = self._board.get_vulnerable().upper() + vulnerable = self._determine_vulnerability( + vulnerability, contract.declarer) + contract.score = contract.calculate_score(tricks, vulnerable) + return contract + + def get_dd_table_par_contract(self, dd_table): + dealer = self._board.get_dealer() + vulnerability = self._board.get_vulnerable().upper() + ns_highest = self._get_highest_makeable_contract( + dd_table, True, False); + ew_highest = self._get_highest_makeable_contract( + dd_table, False, True) + if ns_highest == ew_highest: + return ns_highest.validate() \ + if dealer in ['N', 'S'] else ew_highest.validate() + highest = max(ns_highest, ew_highest) + other_side_highest = min(ew_highest, ns_highest) + ns_playing = highest.declarer in ['N', 'S'] + defense_vulnerability = self._determine_vulnerability( + vulnerability, 'E' if ns_playing else 'N') + highest_defense = highest.get_defense(dd_table, defense_vulnerability) + if highest_defense is not None: + # Highest contract has profitable defense + return highest_defense.validate() + denomination_index = bcw.DENOMINATIONS.index(highest.denomination) \ + if highest.denomination in bcw.DENOMINATIONS \ + else -1 + declarer_index = bcw.PLAYERS.index(highest.declarer) \ + if highest.declarer in bcw.PLAYERS else -1 + player_indexes = [declarer_index, (declarer_index + 2) % 4] + vulnerable = self._determine_vulnerability( + vulnerability, highest.declarer) + score_squared = highest.score * highest.score + possible_optimums = [] + for i in range(0, 5): + for player in player_indexes: + level = highest.level + if i > denomination_index: + level -= 1 + while level > 0: + contract = ParContract( + level, + bcw.DENOMINATIONS[i], + bcw.PLAYERS[player], + False, 0) + contract.score = contract.calculate_score( + dd_table[player][i], vulnerable) + if other_side_highest > contract: + # Contract is lower than other side's contract + break + if (highest.score * contract.score) > 0: + # Contract makes + if abs(contract.score) >= abs(highest.score): + # Contract is profitable + defense = contract.get_defense( + dd_table, defense_vulnerability) + if defense is not None \ + and (contract.score * contract.score) \ + > (contract.score * defense.score): + # Contract has defense + possible_optimums.append(defense) + # So lower contracts will too. + break + else: + # Contract does not have defense + possible_optimums.append(contract) + else: + # Contract is not profitable + break + level -= 1 + for contract in possible_optimums: + if abs(contract.score) > abs(highest.score): + # Contract is more profitable + highest = contract + else: + if contract.score == highest.score: + if highest > contract: + # Equally profitable, but lower + highest = contract + return highest.validate() + + def get_par_contract(self, dd_table): + try: + return self.get_jfr_par_contract() + except FieldNotFoundException: + try: + return self.get_pbn_par_contract() + except FieldNotFoundException: + return self.get_dd_table_par_contract(dd_table) diff --git a/src/bcdd/__init__.py b/src/bcdd/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/bcdd/__init__.py |