import bisect from email.mime import base import hashlib import math import numpy as np import random def encode(v): byte_values = [] for i in range(0, math.ceil(len(v) / 8)): x = 0 for j in range(0, 8): index = i * 8 + j if index >= len(v): continue x <<= 1 x |= int(v[index]) byte_values.append(x) return bytearray(byte_values) def sha(v): x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result[0] & 0b1 def hamming_distance(a, b, scratch): np.logical_xor(a, b, scratch) return sum(scratch) def index_hash(indices): return ','.join([str(index) for index in sorted(indices)]) class Candidate(): def __init__(self, indices): self.indices = indices[:] def evaluate(self, x): if len(x) in self.indices: return 0 value = 1 for index in self.indices: value *= x[index] return value def id(self): return index_hash(self.indices) def eval_str(self): parts = [] for index in self.indices: parts.append('x[' + str(index) + ']') return '*'.join(parts) class Probabilities(): def __init__(self): self.N = 8 self.actual_N = self.N * 2 self.num_terms = 1 self.num_candidates = 100 self.sample_size = 64 self.p = np.zeros((self.actual_N + 1,)) self.p_temp = np.empty_like(self.p) self.next_p = np.empty_like(self.p) self.knowns = [] self.stops = set() self.reset_p() self.epoch = 0 self.inputs = np.zeros((self.sample_size, self.actual_N)).astype(np.int32) self.distances = np.zeros((self.sample_size, self.sample_size)) self.xor_square = np.zeros((self.sample_size, self.sample_size)) self.base_outputs = np.zeros((self.sample_size)).astype(np.int32) self.outputs = np.zeros((self.sample_size)).astype(np.int32) self.expected_outputs = np.zeros((self.sample_size)).astype(np.int32) self.output_xor = np.zeros((self.sample_size)).astype(np.int32) self.max_coherences = np.zeros((self.actual_N + 1)) self.max_candidates = [None for _ in range(0, self.actual_N)] self.layers = [] self.base = None self.rings = [] self.scratch = np.zeros((self.actual_N,)) self.last_value = -1 self.rounds = 0 self.average_delta_over_null = 0 def randomize_inputs(self): for i in range(0, self.sample_size): for j in range(0, self.N): val = random.randint(0, 1) self.inputs[i][j * 2] = val self.inputs[i][j * 2 + 1] = val ^ 1 def populate_distances(self): for i in range(0, len(self.inputs)): x_a = self.inputs[i] for j in range(0, len(self.inputs)): if i == j: continue x_b = self.inputs[j] distance = hamming_distance(x_a, x_b, self.scratch) self.distances[i][j] = 1.0 / (2 ** distance) def compute_rings(self): self.rings = [] for i in range(0, len(self.inputs)): x_a = self.inputs[i] min_distance = self.actual_N indices = [] for j in range(0, len(self.inputs)): if i == j: continue x_b = self.inputs[j] distance = hamming_distance(x_a, x_b, self.scratch) if distance < min_distance: min_distance = distance indices = [j] elif distance == min_distance: indices.append(j) self.rings.append(indices) def compute_expected_outputs(self): for i in range(0, len(self.inputs)): self.expected_outputs[i] = sha(self.inputs[i]) def compute_base_outputs(self): if self.base is None: self.base_outputs.fill(0) return for i in range(0, len(self.inputs)): self.base_outputs[i] = self.base(self.inputs[i]) def coherence(self, outputs=None): if outputs is None: outputs = self.outputs np.logical_xor(outputs, self.expected_outputs, self.output_xor) coherences = [] for i in range(0, len(self.output_xor)): y_a = self.output_xor[i] numerator = 0 denominator = 0 for j in range(0, len(self.output_xor)): if i == j: continue y_b = self.output_xor[j] weight = self.distances[i][j] denominator += weight if y_a == 0 and y_b == 0 or y_a == 1 and y_b == 1: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def ring_coherence(self, outputs=None): if outputs is None: outputs = self.outputs np.logical_xor(outputs, self.expected_outputs, self.output_xor) total = 0 for i in range(0, len(self.output_xor)): y_a = self.output_xor[i] indices = self.rings[i] coherence = sum([1 if self.output_xor[j] == y_a else 0 for j in indices]) / len(indices) total += coherence return total / len(self.output_xor) def normalize_p(self): check = self.knowns[:] for i in range(0, len(self.p)): if self.p[i] < 0: self.p[i] = 0 for i in range(0, len(self.p)): if i in self.knowns: flip = i ^ 0b1 self.p[i] = 0.0 self.p[flip] = 0.0 else: check.append(i) stop_id = index_hash(check) check.pop() if stop_id in self.stops: self.p[i] = 0.0 total = np.sum(self.p) if total > 0: for i in range(0, len(self.p)): self.p[i] = self.p[i] / total def reset_p(self): self.p.fill(1.0) self.normalize_p() def threshold(self): # return (1.0 / (self.num_terms - len(self.knowns))) - (self.epoch / 100) return 1.0 - (self.epoch / 100) def get_converged_index(self): for i in range(0, len(self.p)): if self.p[i] > self.threshold(): return i return None def add_layer(self): self.add_stop() layer = Candidate(self.knowns) self.layers.append(layer) self.base = self.cache_layers() self.knowns.pop() self.reset_p() def random_sample(self): self.randomize_inputs() self.populate_distances() # self.compute_rings() self.compute_expected_outputs() self.compute_base_outputs() return self.coherence(self.base_outputs) # return self.ring_coherence(self.base_outputs) def random_candidate(self): indices = self.knowns[:] np.copyto(self.p_temp, self.p) self.p_temp[self.actual_N] = 0 total = np.sum(self.p_temp) if total == 0: return None np.divide(self.p_temp, total, self.p_temp) for _ in range(0, self.num_terms - len(self.knowns)): index = np.random.choice(len(self.p_temp), 1, p=self.p_temp)[0] indices.append(index) flip = index ^ 0b1 self.p_temp[index] = 0 self.p_temp[flip] = 0 for i in range(0, len(self.p_temp)): if i not in indices: indices.append(i) stop_id = index_hash(indices) indices.pop() if stop_id in self.stops: self.p_temp[i] = 0.0 total = np.sum(self.p_temp) if total == 0: return None np.divide(self.p_temp, total, self.p_temp) return Candidate(indices) def add_stop(self): stop_id = index_hash(self.knowns) self.stops.add(stop_id) def update(self): self.epoch += 1 base_coherence = self.random_sample() self.max_coherences.fill(0) for i in range(0, self.actual_N): self.max_candidates[i] = None visited = set() has_candidate = False # np.copyto(self.next_p, self.p) for _ in range(0, self.num_candidates): candidate = self.random_candidate() if candidate is None: continue candidate_id = candidate.id() if candidate_id in visited: continue visited.add(candidate_id) if self.actual_N in candidate.indices: continue has_candidate = True for i in range(0, len(self.inputs)): self.outputs[i] = self.base_outputs[i] ^ candidate.evaluate(self.inputs[i]) # coherence = self.ring_coherence() coherence = self.coherence() # if coherence <= base_coherence: # continue # for index in candidate.indices: # self.next_p[index] += (coherence - base_coherence) * (1 / 1000.0) # self.p_temp[index] += 0 for index in candidate.indices: if coherence > self.max_coherences[index]: self.max_coherences[index] = coherence self.max_candidates[index] = candidate # self.max_coherences[index] = max(self.max_coherences[index], coherence) # np.copyto(self.p, self.next_p) # np.copyto(self.p_temp, self.p) for i in range(0, self.actual_N): candidate = self.max_candidates[i] if candidate is None: continue for index in candidate.indices: self.p[index] += (self.max_coherences[index] - base_coherence) * (1 / 1000.0) # print(i, self.max_coherences[i] - base_coherence, self.max_candidates[i].id()) self.normalize_p() # print(self.p) # np.subtract(self.p_temp, self.p, self.p_temp) # np.abs(self.p_temp, self.p_temp) # delta = np.sum(self.p_temp) / len(self.p_temp) # print(delta, np.argmax(self.p)) # np.copyto(self.p_temp, self.p) # for i in range(0, len(self.p_temp)): # self.p_temp[i] = round(self.p_temp[i] * 100) / 100 # print(self.p_temp) index = np.argmax(self.p) delta_over_null = self.p[index] - self.p[self.actual_N] if self.epoch == 0: self.average_delta_over_null = delta_over_null else: self.average_delta_over_null = 0.9 * self.average_delta_over_null + 0.1 * delta_over_null diff = self.num_terms - len(self.knowns) print(self.average_delta_over_null, np.argpartition(self.p, -diff)[-diff:], np.argmax(self.p)) # Always iterate for a minimum number of epochs if self.epoch < 15: return if self.average_delta_over_null > 0.00001 and self.average_delta_over_null < 0.001 and self.epoch < 300: return if self.average_delta_over_null < 0.001: index = self.actual_N else: index = np.argmax(self.p) # index = np.argmax(self.p) # if index == self.last_value: # self.rounds += 1 # else: # self.rounds = 0 # self.last_value = index # if self.rounds < 10 and self.epoch < 100: # return # if self.epoch < 5 or (delta > 0.001 and self.epoch < 50): # return # index = np.argmax(self.p) # print(self.p) # print(self.threshold()) # print(self.p) # index = self.get_converged_index() if not index is None or not has_candidate: # print(index, delta, np.argmax(self.p)) self.epoch = 0 if index == self.actual_N or not has_candidate: if len(self.knowns) > 0: self.add_stop() self.knowns.pop() print('Backtrack: ' + str(self.knowns)) self.reset_p() return self.num_terms += 1 self.knowns = [] self.stops = set() self.reset_p() print(self.num_terms) return self.knowns.append(index) # bisect.insort(self.knowns, index) if len(self.knowns) == self.num_terms: print('Add layer: ' + str(self.knowns)) self.add_layer() else: print('Found term: ' + str(self.knowns)) self.reset_p() print(base_coherence) return def cache_layers(self): expr = 'def f(x):\n\tresult=0\n' for layer in self.layers: expr += '\tresult^=' + layer.eval_str() + '\n' expr += '\treturn result\n' scope = {} exec(expr, scope) return scope['f'] def main(): probabilities = Probabilities() while probabilities.num_terms <= probabilities.N: probabilities.update() if __name__ == "__main__": main()