from enum import unique import hashlib import math import numpy as np import random import time N = 8 M = 2 def vec_to_int(x): global N z = 0 for i in range(0, N + 1): z <<= 1 z |= x[i] return z def timeit(f): def timed(*args, **kw): ts = time.time() result = f(*args, **kw) te = time.time() print('func:%r took: %2.4f sec' % (f.__name__, te-ts)) return result return timed class Candidate: def __init__(self, layer): global N self.layer = layer self.node_count = 2 ** layer self.offsets = np.zeros((self.node_count, N + 1)).astype(np.int32) class Probabilities: def __init__(self, layer): global N self.layer = layer self.node_count = 2 ** layer self.p_offsets = np.zeros((self.node_count, N + 1)) self.p_offsets.fill(0.5) self.offset_coherences = np.zeros((2, self.node_count, N + 1, 2, self.node_count, N + 1)) self.offset_coherences.fill(-1) def inertia(self): global N total = 0 for i in range(0, self.node_count): for j in range(0, N + 1): if self.p_offsets[i][j] > 1e-2 and self.p_offsets[i][j] < (1 - 1e-2): total += abs(self.offset_coherences[1][i][j][1][i][j] - self.offset_coherences[0][i][j][0][i][j]) return total def flatten(self): candidate = Candidate(self.layer) for i in range(0, self.node_count): for j in range(0, N + 1): candidate.offsets[i][j] = 1 if self.p_offsets[i][j] >= 0.5 else 0 if self.node_count > 1: for i in range(0, self.node_count): if not candidate.offsets[i].any(): q = i ^ 0b1 candidate.offsets[q].fill(0) return candidate def clamp(x, min_value = 0.01, max_value = 1): return min(max(x, min_value), max_value) def encode(v): global N byte_values = [] for i in range(0, math.ceil(N / 8)): x = 0 for j in range(0, 8): index = i * 8 + j if index >= len(v): continue x <<= 1 x |= int(v[index]) byte_values.append(x) return bytearray(byte_values) # 00100111 x4 # 00000110 x1 def sha(v): global M x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result[0] % M def xor(x): num_one_bits = 0 for i in range(0, len(x)): if i == 0: continue num_one_bits += x[i] return num_one_bits % 2 def test_fn(x): # 0 1 # 2 | 3 # 4 | 5 | 6 | 7 # | | 0 | 7 | | | | return x[0] ^ x[1] ^ ((x[2] ^ (x[4] * (x[5] ^ (x[0] * x[7])))) * (x[3] ^ (x[6] * x[7]))) def candidate_fn(x): return x[0] ^ x[1] ^ (~(x[2] ^ x[3]) * x[2]) def true_fn(x): return x[0] ^ x[1] ^ (x[3] * x[2]) def hamming_distance(a, b, scratch): np.logical_xor(a, b, scratch) return sum(scratch) def coherence(outputs, distances): coherences = [] for i in range(0, len(outputs)): y_a = outputs[i] numerator = 0 denominator = 0 for j in range(0, len(outputs)): if i == j: continue y_b = outputs[j] weight = distances[i][j] denominator += weight if y_a == 0 and y_b == 0 or y_a == 1 and y_b == 1: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def random_sample(m, n): inputs = np.zeros((m, n + 1)).astype(np.int32) for i in range(0, m): for j in range(0, n): inputs[i][j] = random.randint(0, 1) inputs[i][n] = 1 return inputs def populate_distances(inputs, distances, scratch): for i in range(0, len(inputs)): x_a = inputs[i] for j in range(0, len(inputs)): if i == j: continue x_b = inputs[j] distance = hamming_distance(x_a, x_b, scratch) distances[i][j] = 1.0 / (2 ** distance) def populate_layers_scratch(layers, x, layers_scratch, compute_scratch): layers_scratch[0].fill(0) for i in range(1, len(layers_scratch)): scratch = layers_scratch[i] layer = layers[i - 1] for j in range(0, layer.node_count): value = 0 np.multiply(layer.offsets[j], x, compute_scratch) value ^= np.sum(compute_scratch) % 2 left = layers_scratch[i - 1][j * 2] right = layers_scratch[i - 1][j * 2 + 1] value ^= left * right scratch[j] = value return layers_scratch[-1][0] def evaluate_cached(candidate, x, layers_scratch, layers_scratch_base, compute_scratch): global N maybe_evaluate = set() for j in range(0, candidate.node_count, 2): value = 0 np.multiply(candidate.offsets[j], x, compute_scratch) value ^= np.sum(compute_scratch) % 2 layers_scratch[0][j] = value if candidate.node_count > 1: value = 0 np.multiply(candidate.offsets[j + 1], x, compute_scratch) value ^= np.sum(compute_scratch) % 2 layers_scratch[0][j + 1] = value if layers_scratch[0][j] == 1 and layers_scratch[0][j + 1] == 1: maybe_evaluate.add(int(j / 2)) for i in range(1, len(layers_scratch)): np.copyto(layers_scratch[i], layers_scratch_base[i]) maybe_evaluate_next = set() for j in maybe_evaluate: left = layers_scratch[i - 1][j * 2] right = layers_scratch[i - 1][j * 2 + 1] child_value = left * right left_base = layers_scratch_base[i - 1][j * 2] right_base = layers_scratch_base[i - 1][j * 2 + 1] child_base_value = left_base * right_base if child_value != child_base_value: layers_scratch[i][j] ^= 1 maybe_evaluate_next.add(int(j / 2)) maybe_evaluate = maybe_evaluate_next return layers_scratch[-1][0] def evaluate(layers, candidate, x, layers_scratch, compute_scratch): global N for i in range(0, len(layers_scratch)): scratch = layers_scratch[i] if i == 0: for j in range(0, candidate.node_count): value = 0 np.multiply(candidate.offsets[j], x, compute_scratch) value ^= np.sum(compute_scratch) % 2 scratch[j] = value else: layer = layers[i - 1] for j in range(0, layer.node_count): value = 0 np.multiply(layer.offsets[j], x, compute_scratch) value ^= np.sum(compute_scratch) % 2 left = layers_scratch[i - 1][j * 2] right = layers_scratch[i - 1][j * 2 + 1] value ^= left * right scratch[j] = value return layers_scratch[-1][0] @timeit def compute_scores(probabilities, candidates, num_candidates, layers, scores, distances, inputs, outputs, output_xor, expected_outputs, sample_size, layers_scratch, layers_scratch_base, int_scratch, scratch): global M, N scores.fill(0) unique_candidates = {} for j in range(0, num_candidates): create_candidate(probabilities, candidates[j]) unique_candidates[candidate_str(candidates[j])] = j for i in range(0, sample_size): populate_layers_scratch(layers, inputs[i], layers_scratch_base, int_scratch) for _, j in unique_candidates.items(): candidate = candidates[j] outputs[j][i] = evaluate_cached(candidate, inputs[i], layers_scratch, layers_scratch_base, int_scratch) # if outputs[j][i] != evaluate(layers, candidate, inputs[i], layers_scratch, int_scratch): # print('Uh-oh') for _, j in unique_candidates.items(): candidate = candidates[j] np.subtract(outputs[j], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) scores[j] = coherence(output_xor, distances) @timeit def update_probabilities(probabilities, candidates, inputs, scores): global N num_candidates = len(candidates) variance = np.zeros((N + 1,)) for x in inputs: variance += x probabilities.offset_coherences.fill(-1) for p in range(0, num_candidates): candidate = candidates[p] score = scores[p] if score == 0: continue for j in range(0, probabilities.node_count): for k in range(0, N + 1): i = candidate.offsets[j][k] for m in range(0, probabilities.node_count): for n in range(0, N + 1): l = candidate.offsets[m][n] probabilities.offset_coherences[i][j][k][l][m][n] = max(score, probabilities.offset_coherences[i][j][k][l][m][n]) # for i in range(0, 2): # for j in range(0, probabilities.node_count): # for k in range(0, N + 1): # for l in range(0, 2): # for m in range(0, probabilities.node_count): # for n in range(0, N + 1): # offset_max = 0 # offset_sum = 0 # offset_count = 0 # for p in range(0, num_candidates): # candidate = candidates[p] # if candidate.offsets[j][k] != i: # continue # if candidate.offsets[m][n] != l: # continue # if scores[p] == 0: # continue # offset_max = max(offset_max, scores[p]) # offset_sum += scores[p] # offset_count += 1 # if offset_max == 0: # continue # probabilities.offset_coherences[i][j][k][l][m][n] = offset_max p_offsets_next = np.zeros((probabilities.node_count, N + 1)) inertia = 0 for j in range(0, probabilities.node_count): for k in range(0, N + 1): delta = 0 count = 0 for m in range(0, probabilities.node_count): for n in range(0, N + 1): if j == m and k == n: continue # confidence = variance[k] * variance[n] / (len(inputs) ** 2) confidence = 1.0 p_j1_if_m0 = probabilities.offset_coherences[1][j][k][0][m][n] p_j0_if_m0 = probabilities.offset_coherences[0][j][k][0][m][n] p_j1_if_m1 = probabilities.offset_coherences[1][j][k][1][m][n] p_j0_if_m1 = probabilities.offset_coherences[0][j][k][1][m][n] if p_j1_if_m0 >= 0 and p_j0_if_m0 >= 0: delta_if_m0 = p_j1_if_m0 - p_j0_if_m0 delta += delta_if_m0 * (1.0 - probabilities.p_offsets[m][n]) * confidence count += 1 if p_j1_if_m1 >= 0 and p_j0_if_m1 >= 0: delta_if_m1 = p_j1_if_m1 - p_j0_if_m1 delta += delta_if_m1 * probabilities.p_offsets[m][n] * confidence count += 1 if count > 0: delta /= count p_offsets_next[j][k] = clamp(probabilities.p_offsets[j][k] + delta, 0, 1) inertia += abs(p_offsets_next[j][k] - probabilities.p_offsets[j][k]) for j in range(0, probabilities.node_count): for k in range(0, N + 1): p_offset_next = p_offsets_next[j][k] probabilities.p_offsets[j][k] = 0.9 * probabilities.p_offsets[j][k] + 0.1 * p_offset_next # if probabilities.node_count > 1: # for j in range(0, probabilities.node_count): # q = j ^ 0b1 # for k in range(0, N + 1): # if probabilities.p_offsets[j][k] > 0.5: # probabilities.p_offsets[q][k] = min(probabilities.p_offsets[q][k], 1 - probabilities.p_offsets[j][k]) return inertia def create_candidate(probabilities, candidate): global N for i in range(0, probabilities.node_count): for j in range(0, N + 1): candidate.offsets[i][j] = 1 if random.random() < probabilities.p_offsets[i][j] else 0 def copy_candidate(src, dest): global N for i in range(0, src.node_count): for j in range(0, N + 1): dest.offsets[i][j] = src.offsets[i][j] def p(x): return math.ceil(x * 100) / 100 def p_a(x): return [p(z) for z in x] def print_probabilities(probabilities): print('=====================') for i in range(0, probabilities.node_count): print(i, p_a(probabilities.p_offsets[i])) print('=====================') def candidate_str(candidate): global N build_str = '' for i in range(0, candidate.node_count): for j in range(0, N + 1): build_str += str(candidate.offsets[i][j]) return build_str def main(): global N, M sample_size = 64 num_candidates = 100 num_survivors = 8 output_xor = np.zeros(sample_size,) scratch = np.zeros((N + 1,)) int_scratch = np.zeros((N + 1,)).astype(np.int32) g = test_fn expected_outputs = np.zeros((sample_size,)) inputs = random_sample(sample_size, N) distances = np.zeros((sample_size, sample_size)) populate_distances(inputs, distances, scratch) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) outputs = np.zeros((num_candidates + num_survivors, sample_size,)) scores = np.zeros((num_candidates + num_survivors,)) layers = [] layers_scratch = [np.zeros(1, ).astype(np.int32)] layers_scratch_base = [np.zeros(1, ).astype(np.int32)] layer = 0 # for i in range(0, sample_size): # outputs[0][i] = candidate_fn(inputs[i]) np.subtract(outputs[0], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) score = coherence(output_xor, distances) # print(score) # for i in range(0, sample_size): # outputs[0][i] = true_fn(inputs[i]) # np.subtract(outputs[0], expected_outputs, output_xor) # np.mod(output_xor, M, output_xor) # score = coherence(output_xor, distances) # print(score) # return while score < 1: probabilities = Probabilities(layer) candidates = [Candidate(layer) for _ in range(0, num_candidates + num_survivors)] inertia = 1 while inertia > 0.01: compute_scores(probabilities, candidates, num_candidates, layers, scores, distances, inputs, outputs, output_xor, expected_outputs, sample_size, layers_scratch, layers_scratch_base, int_scratch, scratch) round_inertia = update_probabilities(probabilities, candidates, inputs, scores) inertia = 0.9 * inertia + 0.1 * round_inertia print_probabilities(probabilities) for candidate in layers: print(candidate.offsets) print(np.max(scores), round_inertia, inertia) top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:] for i in range(0, num_survivors): src_index = top_n[i] dest_index = num_candidates + i if src_index == dest_index: continue src = candidates[src_index] dest = candidates[dest_index] candidates[dest_index] = src candidates[src_index] = dest inputs = random_sample(sample_size, N) populate_distances(inputs, distances, scratch) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) candidate = probabilities.flatten() for j in range(0, sample_size): outputs[0][j] = evaluate(layers, candidate, inputs[j], layers_scratch, int_scratch) np.subtract(outputs[0], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) score = coherence(output_xor, distances) layers.insert(0, candidate) layer += 1 layers_scratch.insert(0, np.zeros(2 ** layer,).astype(np.int32)) layers_scratch_base.insert(0, np.zeros(2 ** layer,).astype(np.int32)) for candidate in layers: print(candidate.offsets) if __name__ == "__main__": main()