import hashlib import math import numpy as np import random from struct import pack, pack_into, unpack_from import secrets from numpy import hamming N = 8 def bit_at_index(buffer, index): offset = (index >> 3) % len(buffer) return buffer[offset] & (1 << (index & 0b111)) != 0 def count_one_bits(n): return bin(n).count("1") def hamming_distance(a, b, scratch): np.logical_xor(a, b, scratch) return sum(scratch) def encode_f(f, buffer, offset=0): (inverted, flips, child) = f pack_into('I', buffer, offset, inverted) offset += 4 for index in flips: pack_into('I', buffer, offset, 0) offset += 4 pack_into('I', buffer, offset, index) offset += 4 if child is None: pack_into('I', buffer, offset, 1) offset += 4 return offset (inverted, left, right) = child pack_into('I', buffer, offset, 2 if not inverted else 3) offset += 4 offset = encode_f(left, buffer, offset) offset = encode_f(right, buffer, offset) return offset def generate_random_branch(p_mutation): global N p_add_indices = p_mutation * random.random() p_add_children = p_mutation * random.random() inverted = random.randint(0, 1) indices = set() children = [] # randomly add indices while random.random() < p_add_indices and len(indices) < N: available_indices = [i for i in range(0, N) if i not in indices] if len(available_indices) == 1: indices.add(available_indices[0]) continue indices.add(available_indices[random.randint(0, len(available_indices) - 1)]) # randomly add children while random.random() < p_add_children: child_inverted = random.randint(0, 1) left = generate_random_branch(p_add_children) right = generate_random_branch(p_add_children) children.append((child_inverted, left, right)) return (inverted, indices, children) def mutate_f(f, p_mutation): global N (inverted, indices, children) = f mutated_indices = set(indices) mutated_children = children[:] p_invert = p_mutation * random.random() p_drop_indices = p_mutation * random.random() p_add_indices = p_mutation * random.random() p_drop_children = p_mutation * random.random() p_mutate_child = p_mutation * random.random() p_clone_child = p_mutation * random.random() p_invert_child = p_mutation * random.random() p_add_children = p_mutation * random.random() # randomly invert if random.random() < p_invert: inverted ^= 1 # randomly drop indices while random.random() < p_drop_indices and len(mutated_indices) > 0: mutated_indices.pop() # randomly add indices while random.random() < p_add_indices and len(mutated_indices) < N: available_indices = [i for i in range(0, N) if i not in mutated_indices] if len(available_indices) == 1: mutated_indices.add(available_indices[0]) continue mutated_indices.add(available_indices[random.randint(0, len(available_indices) - 1)]) # randomly drop children while random.random() < p_drop_children and len(mutated_children) > 0: if len(mutated_children) == 1: del mutated_children[0] break del mutated_children[random.randint(0, len(mutated_children) - 1)] # randomly clone children while random.random() < p_clone_child and len(mutated_children) > 0: index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1) (child_inverted, left, right) = mutated_children[index] if random.random() < p_invert_child: child_inverted ^= 1 clone = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation)) mutated_children.append(clone) # randomly mutate children while random.random() < p_mutate_child and len(mutated_children) > 0: index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1) (child_inverted, left, right) = mutated_children[index] if random.random() < p_invert_child: child_inverted ^= 1 mutated_children[index] = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation)) # randomly add children while random.random() < p_add_children: child_inverted = random.randint(0, 1) left = generate_random_branch(p_mutation) right = generate_random_branch(p_mutation) mutated_children.append((child_inverted, left, right)) return (inverted, mutated_indices, mutated_children) def decode_f(buffer, mutate = False, offset = 0, skip_invert = False): global N inverted = 0 if not skip_invert: [inverted] = unpack_from('I', buffer, offset) offset += 4 # random invert if mutate and random.random() < 0.01: inverted ^= 1 inverted &= 0b1 flips = set() # random add flip while mutate and random.random() < 0.5 and len(flips) < N: available_indices = [i for i in range(0, N) if i not in flips] if len(available_indices) == 1: flips.add(available_indices[0]) continue flips.add(available_indices[random.randint(0, len(available_indices) - 1)]) while offset < len(buffer): # random create branch if mutate and random.random() < 0.01: gate_inverted = random.randint(0, 1) left = generate_random_branch() (offset, right) = decode_f(buffer, mutate, offset, True) return (offset, (inverted, flips, (gate_inverted, left, right))) [opcode] = unpack_from('I', buffer, offset) offset += 4 opcode &= 0b11 if opcode == 0: [index] = unpack_from('I', buffer, offset) offset += 4 # random skip flip if mutate and random.random() < 0.01: continue if index in flips: flips.remove(index) else: flips.add(index) elif opcode == 1: return (offset, (inverted, flips, None)) else: (offset, left) = decode_f(buffer, mutate, offset) (offset, right) = decode_f(buffer, mutate, offset) gate_inverted = 0 if opcode == 2 else 1 # random invert if mutate and random.random() < 0.01: gate_inverted ^= 1 # random skip branch if mutate and random.random() < 0.01: return (offset, (inverted, flips, None)) return (offset, (inverted, flips, (gate_inverted, left, right))) return (offset, (inverted, [], None)) def generate_program(f): statement = "" (inverted, indices, children) = f if inverted: statement += "1^" statement += "(" for i in indices: statement += "(x[" + str(i) + ">>3]&(1<<(" + str(i) + "&0b111))!=0)^" for child in children: (gate_inverted, left, right) = child if gate_inverted: statement += "1^" statement += "((" + generate_program(left) + ")&(" + generate_program(right) + "))^" statement += "0)" return statement def compile_f(f): program = 'def f(x):\n\treturn ' + generate_program(f) scope = {} exec(program, scope) return scope['f'] def evaluate(model, x, value = 0): (inverted, indices, children) = model for i in indices: if bit_at_index(x, i) != 0: value ^= 1 for child in children: (child_inverted, left, right) = child left = evaluate(left, x) right = evaluate(right, x) if left & right != child_inverted: value ^= 1 if inverted: value ^= 1 return value def encode(v): byte_values = [] for i in range(0, math.ceil(N / 8)): x = 0 for j in range(0, 8): index = i * 8 + j x <<= 1 x |= int(v[index]) byte_values.append(x) return bytearray(x) def sha(v): x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result[0] & 0b1 def xor(x): num_one_bits = 0 for n in x: num_one_bits += count_one_bits(n) return num_one_bits % 2 def random_sample(m, n): inputs = np.zeros((m, n)) for i in range(0, m): for j in range(0, n): inputs[i][j] = random.randint(0, 1) return inputs def update_sample(sample, index): global N for j in range(0, N): sample[index][j] = random.randint(0, 1) def coherence(inputs, outputs): coherences = [] for i in range(0, len(inputs)): x_a = inputs[i] y_a = outputs[i] numerator = 0 denominator = 0 for j in range(0, len(inputs)): if i == j: continue x_b = inputs[j] y_b = outputs[j] distance = hamming_distance(x_a, x_b) weight = 1.0 / (2 ** distance) denominator += weight if y_a == y_b: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def score(f, sample, distances): return coherence([(x, f(x) ^ y) for (x, y) in sample], distances) def compute_distances(inputs, distances, scratch): for i in range(0, len(inputs)): a = inputs[i] for j in range(i, len(inputs)): if i == j: distances[i][j] = 0 continue b = inputs[j] distance = 2 ** -hamming_distance(a, b, scratch) distances[i][j] = distance distances[j][i] = distance def update_distances(inputs, distances, i, scratch): a = inputs[i] for j in range(0, len(inputs)): if i == j: distances[i][j] = 0 continue b = inputs[j] distance = 2 ** -hamming_distance(a, b, scratch) distances[i][j] = distance distances[j][i] = distance def evaluate_sample(model, sample, output): stack = [model] (_, _, _, root_scratch, _) = model while len(stack) > 0: layer = stack.pop() (inverted, xors, child, scratch, touched) = layer if child is None: np.matmul(sample, xors, scratch) np.mod(scratch, 2, scratch) if inverted == 1: np.logical_xor(1, scratch, scratch) touched[0] = 1 else: (child_inverted, left, right) = child (_, _, _, left_scratch, left_touched) = left (_, _, _, right_scratch, right_touched) = right if left_touched[0] and right_touched[0]: np.multiply(left_scratch, right_scratch, output) np.matmul(sample, xors, scratch) np.mod(scratch, 2, scratch) if inverted: np.logical_xor(scratch, 1, scratch) if child_inverted: np.logical_xor(output, 1, output) np.logical_xor(scratch, output, scratch) touched[0] = 1 else: stack.insert(0, layer) stack.insert(0, left) stack.insert(0, right) np.copyto(output, root_scratch) reset_model(model) def reset_model(model): stack = [model] while len(stack) > 0: layer = stack.pop() (_, _, child, _, touched) = layer touched[0] = 0 if not child is None: (_, left, right) = child stack.append(left) stack.append(right) def clone_model(model, p_mutation): global N p_invert = p_mutation * random.random() p_invert_child = p_mutation * random.random() p_flip = p_mutation * random.random() p_add_child = p_mutation * random.random() # p_drop_child = p_mutation * random.random() * 0.5 p_drop_child = 0 (inverted, xors, child, scratch, touched) = model if random.random() < p_invert: inverted ^= 1 clone_xors = np.zeros((N,)) np.copyto(clone_xors, xors) for i in range(0, N): if random.random() < p_flip: clone_xors[i] = int(clone_xors[i]) ^ 1 clone_scratch = np.zeros(np.shape(scratch)) clone_touched = np.zeros(np.shape(touched)) if child is None: if random.random() < p_add_child: sample_size = len(scratch) child_inverted = random.randint(0, 1) left = random_child(sample_size, p_mutation) right = random_child(sample_size, p_mutation) return (inverted, clone_xors, (child_inverted, left, right), clone_scratch, clone_touched) return (inverted, clone_xors, None, clone_scratch, clone_touched) if random.random() < p_drop_child: return (inverted, clone_xors, None, clone_scratch, clone_touched) (child_inverted, left, right) = child if random.random() < p_invert_child: inverted ^= 1 clone_left = clone_model(left, p_mutation) clone_right = clone_model(right, p_mutation) return (inverted, clone_xors, (child_inverted, clone_left, clone_right), clone_scratch, clone_touched) def random_child(sample_size, p_mutation): global N inverted = random.randint(0, 1) xors = np.zeros((N,)) scratch = np.zeros((sample_size,)) touched = np.zeros((1,)) p_flip = p_mutation * random.random() p_child = p_mutation * random.random() index = random.randint(0, N - 1) xors[index] = 1 for i in range(0, N): if random.random() < p_flip: xors[i] = 1 # if random.random() < p_child: # child_inverted = random.randint(0, 1) # left = random_child(sample_size, p_mutation * random.random()) # right = random_child(sample_size, p_mutation * random.random()) # return (inverted, xors, (child_inverted, left, right), scratch, touched) return (inverted, xors, None, scratch, touched) def size(model): (_, xors, child, _, _) = model xor_size = np.sum(xors) if not child is None: (_, left, right) = child return xor_size + size(left) * size(right) return xor_size def null_candidate(sample_size): global N return (0, np.zeros((N,)), None, np.zeros((sample_size,)), np.zeros((1,))) def main(): global N epochs = 10000 num_survivors = 100 num_offspring = 10 num_candidates = num_survivors + num_survivors * num_offspring sample_size = 32 eval_size = 100 p_mutation = 0.5 g = sha current_generation = [null_candidate(sample_size) for _ in range(0, num_candidates)] distances = np.zeros((sample_size, sample_size)) output_equality = np.zeros((sample_size, sample_size)) inputs = random_sample(sample_size, N) scratch = np.zeros(N,) compute_distances(inputs, distances, scratch) expected_outputs = np.zeros((sample_size,)) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) outputs = np.zeros((sample_size,)) output_xor = np.zeros((sample_size,)) ones = np.ones((sample_size,)) numerators = np.zeros((sample_size,)) denominators = np.zeros((sample_size,)) coherences = np.zeros((sample_size,)) np.matmul(ones, distances, denominators) scores = np.zeros((num_candidates,)) max_score = 0 last_score = 0 streak = 0 for epoch in range(0, epochs): for i in range(0, num_candidates): candidate = current_generation[i] evaluate_sample(candidate, inputs, outputs) np.logical_xor(outputs, expected_outputs, output_xor) for p in range(0, sample_size): for q in range(0, sample_size): m = int(output_xor[p]) n = int(output_xor[q]) output_equality[p][q] = 1 ^ m ^ n np.multiply(output_equality, distances, output_equality) np.matmul(ones, output_equality, numerators) np.divide(numerators, denominators, coherences) score = np.average(coherences) scores[i] = score top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:] survivors = [current_generation[index] for index in top_n] # f = lambda x: evaluate(current_generation[0], x) # correct = 0 # for i in range(0, eval_size): # x = random_input() # if f(x) == g(x): # correct += 1 top_score = scores[top_n[-1]] print(epoch, top_score, size(survivors[-1])) if top_score <= max_score: p_mutation += 0.01 else: p_mutation = 0.5 max_score = top_score for i in range(0, num_survivors): current_generation[i] = survivors[i] for i in range(0, num_survivors): candidate = survivors[i] for j in range(0, num_offspring): index = num_survivors + j * num_survivors + i current_generation[index] = clone_model(candidate, random.random()) # while random.random() < 0.5: if last_score == top_score: # streak += 1 # else: # streak = 0 # if streak >= 4: # streak = 0 inputs = random_sample(sample_size, N) compute_distances(inputs, distances, scratch) np.matmul(ones, distances, denominators) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) # expected_outputs = np.zeros((sample_size,)) # for i in range(0, sample_size): # expected_outputs[i] = g(inputs[i]) # index = random.randint(0, sample_size - 1) # update_sample(inputs, index) # expected_outputs[index] = g(inputs[index]) # update_distances(inputs, distances, index, scratch) # np.matmul(ones, distances, denominators) last_score = top_score if __name__ == "__main__": main()