import hashlib import math from matplotlib import offsetbox import numpy as np import random from struct import pack, pack_into, unpack_from import secrets from numpy import hamming N = 32 M = 2 def bit_at_index(buffer, index): offset = (index >> 3) % len(buffer) return buffer[offset] & (1 << (index & 0b111)) != 0 def count_one_bits(n): return bin(n).count("1") def hamming_distance(a, b, scratch): np.logical_xor(a, b, scratch) return sum(scratch) def encode_f(f, buffer, offset=0): (inverted, flips, child) = f pack_into('I', buffer, offset, inverted) offset += 4 for index in flips: pack_into('I', buffer, offset, 0) offset += 4 pack_into('I', buffer, offset, index) offset += 4 if child is None: pack_into('I', buffer, offset, 1) offset += 4 return offset (inverted, left, right) = child pack_into('I', buffer, offset, 2 if not inverted else 3) offset += 4 offset = encode_f(left, buffer, offset) offset = encode_f(right, buffer, offset) return offset def generate_random_branch(p_mutation): global N p_add_indices = p_mutation * random.random() p_add_children = p_mutation * random.random() inverted = random.randint(0, 1) indices = set() children = [] # randomly add indices while random.random() < p_add_indices and len(indices) < N: available_indices = [i for i in range(0, N) if i not in indices] if len(available_indices) == 1: indices.add(available_indices[0]) continue indices.add(available_indices[random.randint(0, len(available_indices) - 1)]) # randomly add children while random.random() < p_add_children: child_inverted = random.randint(0, 1) left = generate_random_branch(p_add_children) right = generate_random_branch(p_add_children) children.append((child_inverted, left, right)) return (inverted, indices, children) def mutate_f(f, p_mutation): global N (inverted, indices, children) = f mutated_indices = set(indices) mutated_children = children[:] p_invert = p_mutation * random.random() p_drop_indices = p_mutation * random.random() p_add_indices = p_mutation * random.random() p_drop_children = p_mutation * random.random() p_mutate_child = p_mutation * random.random() p_clone_child = p_mutation * random.random() p_invert_child = p_mutation * random.random() p_add_children = p_mutation * random.random() # randomly invert if random.random() < p_invert: inverted ^= 1 # randomly drop indices while random.random() < p_drop_indices and len(mutated_indices) > 0: mutated_indices.pop() # randomly add indices while random.random() < p_add_indices and len(mutated_indices) < N: available_indices = [i for i in range(0, N) if i not in mutated_indices] if len(available_indices) == 1: mutated_indices.add(available_indices[0]) continue mutated_indices.add(available_indices[random.randint(0, len(available_indices) - 1)]) # randomly drop children while random.random() < p_drop_children and len(mutated_children) > 0: if len(mutated_children) == 1: del mutated_children[0] break del mutated_children[random.randint(0, len(mutated_children) - 1)] # randomly clone children while random.random() < p_clone_child and len(mutated_children) > 0: index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1) (child_inverted, left, right) = mutated_children[index] if random.random() < p_invert_child: child_inverted ^= 1 clone = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation)) mutated_children.append(clone) # randomly mutate children while random.random() < p_mutate_child and len(mutated_children) > 0: index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1) (child_inverted, left, right) = mutated_children[index] if random.random() < p_invert_child: child_inverted ^= 1 mutated_children[index] = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation)) # randomly add children while random.random() < p_add_children: child_inverted = random.randint(0, 1) left = generate_random_branch(p_mutation) right = generate_random_branch(p_mutation) mutated_children.append((child_inverted, left, right)) return (inverted, mutated_indices, mutated_children) def decode_f(buffer, mutate = False, offset = 0, skip_invert = False): global N inverted = 0 if not skip_invert: [inverted] = unpack_from('I', buffer, offset) offset += 4 # random invert if mutate and random.random() < 0.01: inverted ^= 1 inverted &= 0b1 flips = set() # random add flip while mutate and random.random() < 0.5 and len(flips) < N: available_indices = [i for i in range(0, N) if i not in flips] if len(available_indices) == 1: flips.add(available_indices[0]) continue flips.add(available_indices[random.randint(0, len(available_indices) - 1)]) while offset < len(buffer): # random create branch if mutate and random.random() < 0.01: gate_inverted = random.randint(0, 1) left = generate_random_branch() (offset, right) = decode_f(buffer, mutate, offset, True) return (offset, (inverted, flips, (gate_inverted, left, right))) [opcode] = unpack_from('I', buffer, offset) offset += 4 opcode &= 0b11 if opcode == 0: [index] = unpack_from('I', buffer, offset) offset += 4 # random skip flip if mutate and random.random() < 0.01: continue if index in flips: flips.remove(index) else: flips.add(index) elif opcode == 1: return (offset, (inverted, flips, None)) else: (offset, left) = decode_f(buffer, mutate, offset) (offset, right) = decode_f(buffer, mutate, offset) gate_inverted = 0 if opcode == 2 else 1 # random invert if mutate and random.random() < 0.01: gate_inverted ^= 1 # random skip branch if mutate and random.random() < 0.01: return (offset, (inverted, flips, None)) return (offset, (inverted, flips, (gate_inverted, left, right))) return (offset, (inverted, [], None)) def generate_program(model, output_var='output'): global N, M (constant, indices, child) = model statement = 'multiply(' + np.array2string(indices, separator=',') + ', x, temp)\n\t' statement += output_var + '=' + str(constant) + '+sum(temp)\n\t' if not child is None: left_output = output_var + '0' right_output = output_var + '1' (left, right) = child statement += generate_program(left, left_output) statement += generate_program(right, right_output) statement += output_var + '+=' + left_output + '*' + right_output + '\n\t' statement += output_var + '%=' + str(M) + '\n\t' return statement def compile(model): program = 'def f(x, temp):\n\t' + generate_program(model) + 'return output' scope = {'multiply': np.multiply, 'sum': np.sum} exec(program, scope) return scope['f'] def evaluate(model, x, value = 0): (inverted, indices, children) = model for i in indices: if bit_at_index(x, i) != 0: value ^= 1 for child in children: (child_inverted, left, right) = child left = evaluate(left, x) right = evaluate(right, x) if left & right != child_inverted: value ^= 1 if inverted: value ^= 1 return value def encode(v): byte_values = [] for i in range(0, math.ceil(N / 8)): x = 0 for j in range(0, 8): index = i * 8 + j x <<= 1 x |= int(v[index]) byte_values.append(x) return bytearray(x) def sha(v): global M x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result[0] % M def xor(x): num_one_bits = 0 for n in x: num_one_bits += count_one_bits(n) return num_one_bits % 2 def random_sample(m, n): inputs = np.zeros((m, n)) for i in range(0, m): for j in range(0, n): inputs[i][j] = random.randint(0, 1) return inputs def update_sample(sample, index): global N for j in range(0, N): sample[index][j] = random.randint(0, 1) def coherence(inputs, outputs, scratch): coherences = [] for i in range(0, len(inputs)): x_a = inputs[i] y_a = outputs[i] numerator = 0 denominator = 0 for j in range(0, len(inputs)): if i == j: continue x_b = inputs[j] y_b = outputs[j] distance = hamming_distance(x_a, x_b, scratch) weight = 1.0 / (2 ** distance) denominator += weight if y_a == y_b: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def build_coherence_models(inputs, scratch): coherence_models = [] for i in range(0, len(inputs)): x_a = inputs[i] distances = [hamming_distance(x_a, inputs[j], scratch) for j in range(0, len(inputs))] indices = sorted(range(len(distances)), key=lambda i: distances[i]) lowest = -1 denominator = 0 components = [] for index in range(0, len(indices)): j = indices[index] if distances[j] == 0: continue if lowest < 0: lowest = distances[j] distance = distances[j] - lowest if distance >= 8: break weight = 2 ** -distance denominator += weight components.append((weight, j)) coherence_models.append((denominator, components)) return coherence_models def fast_coherence(coherence_models, outputs): coherences = [] for i in range(0, len(coherence_models)): (denominator, components) = coherence_models[i] numerator = 0 for component in components: (weight, j) = component if outputs[i] == outputs[j]: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def score(f, sample, distances): return coherence([(x, f(x) ^ y) for (x, y) in sample], distances) def compute_distances(inputs, distances, scratch): for i in range(0, len(inputs)): a = inputs[i] for j in range(i, len(inputs)): if i == j: distances[i][j] = 0 continue b = inputs[j] distance = 2 ** -hamming_distance(a, b, scratch) distances[i][j] = distance distances[j][i] = distance def update_distances(inputs, distances, i, scratch): a = inputs[i] for j in range(0, len(inputs)): if i == j: distances[i][j] = 0 continue b = inputs[j] distance = 2 ** -hamming_distance(a, b, scratch) distances[i][j] = distance distances[j][i] = distance def clone_model(model, p_mutation): global N, M clone = model[:] p_insert_node = p_mutation * random.random() i = 0 while i < len(clone): (bias, op, indices, (p_modify, p_bias, p_index)) = clone[i] p_modify_node = p_modify if random.random() < p_modify_node: p_modify += 0.01 p_add_index = p_index p_modify_bias = p_bias indices = indices.copy() if random.random() < p_modify_bias: p_bias += 0.01 bias += random.randint(0, M - 1) bias %= M else: p_bias -= 0.01 for index in range(0, N + i): if random.random() < p_add_index: p_index += 0.01 if index in indices: indices.remove(index) else: indices.add(index) else: p_index -= 0.01 else: p_modify -= 0.01 p_modify = min(max(0.01, p_modify), 0.99) p_bias = min(max(0.01, p_bias), 0.99) p_index = min(max(0.01, p_index), 0.99) clone[i] = (bias, op, indices, (p_modify, p_bias, p_index)) i += 1 if random.random() < p_insert_node: i = random.randint(0, len(clone)) clone.insert(i, random_node(N + i - 1, p_mutation)) for j in range(i + 1, len(clone)): (bias, op, indices, p) = clone[j] modified_indices = set() for index in indices: if index < N: modified_indices.add(index) continue shifted_index = index - N if shifted_index == i: if random.randint(0, 1) == 0: modified_indices.add(index) else: modified_indices.add(index + 1) if shifted_index > i: modified_indices.add(index + 1) else: modified_indices.add(index) clone[j] = (bias, op, modified_indices, p) return clone def random_node(max_index, p_mutation): global N bias = random.randint(0, M - 1) op = random.randint(0, 1) p_modify = random.random() p_bias = random.random() p_index = random.random() indices = set() indices.add(random.randint(0, max_index)) p_add_index = p_mutation * random.random() for index in range(0, max_index): if random.random() < p_add_index: indices.add(index) return (bias, op, indices, (p_modify, p_bias, p_index)) def null_candidate(): global N return [] def encode_tree(tree_model): stack = [tree_model] node_indices = {} index = 0 while len(stack) > 0: node = stack.pop() node_indices[node] = index index += 1 (p, bias, value) = node if isinstance(value, int): continue (left, right) = value stack.append(left) stack.append(right) length = index stack = [tree_model] serialized_model = [] while len(stack) > 0: node = stack.pop() (p, bias, value) = node serialized_model.insert(0, ) def eval_model(model, buffer, x): global N, M for i in range(0, len(model)): (bias, op, indices, _) = model[i] value = op for index in indices: if index >= N + i: print('This should not happen') if op == 1: value *= x[index] if index < N else buffer[index - N] value %= M else: value += x[index] if index < N else buffer[index - N] value %= M value += bias value %= M if i == len(model) - 1: return value else: buffer[i] = value return 0 def size(model): return len(model) def main(): global N, M epochs = 10000 num_survivors = 100 num_offspring = 10 num_candidates = num_survivors + num_survivors * num_offspring sample_size = 64 eval_size = 100 max_nodes = 65536 p_mutation = 0.5 g = sha current_generation = [null_candidate() for _ in range(0, num_candidates)] distances = np.zeros((sample_size, sample_size)) output_equality = np.zeros((sample_size, sample_size)) inputs = random_sample(sample_size, N) scratch = np.zeros(N,) # compute_distances(inputs, distances, scratch) expected_outputs = np.zeros((sample_size,)) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) outputs = np.zeros((sample_size,)) output_xor = np.zeros((sample_size,)) ones = np.ones((sample_size,)) numerators = np.zeros((sample_size,)) denominators = np.zeros((sample_size,)) coherences = np.zeros((sample_size,)) np.matmul(ones, distances, denominators) scores = np.zeros((num_candidates,)) eval_buffer = np.zeros((max_nodes,)) max_score = 0 last_score = 0 streak = 0 coherence_models = build_coherence_models(inputs, scratch) for epoch in range(0, epochs): for i in range(0, num_candidates): candidate = current_generation[i] for j in range(0, sample_size): outputs[j] = eval_model(candidate, eval_buffer, inputs[j]) np.subtract(outputs, expected_outputs, output_xor) np.mod(output_xor, M, output_xor) # for p in range(0, sample_size): # for q in range(0, sample_size): # m = int(output_xor[p]) # n = int(output_xor[q]) # distance = abs(m - n) # if distance > M / 2: # distance = M - distance # distance /= (M / 2) # distance **= 2 # output_equality[p][q] = distance # # output_equality[p][q] = 1 if m == n else 0 # np.multiply(output_equality, distances, output_equality) # np.matmul(ones, output_equality, numerators) # np.divide(numerators, denominators, coherences) # score = np.average(coherences) score = fast_coherence(coherence_models, output_xor) # if random.random() < 0.1: # check = coherence(inputs, output_xor, scratch) # if check - score > 1e-3: # print('not equal') scores[i] = score top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:] survivors = [current_generation[index] for index in top_n] # f = lambda x: evaluate(current_generation[0], x) # correct = 0 # for i in range(0, eval_size): # x = random_input() # if f(x) == g(x): # correct += 1 top_score = scores[top_n[-1]] print(epoch, top_score, size(survivors[-1])) if top_score <= max_score: p_mutation += 0.01 else: p_mutation = 0.5 max_score = top_score for i in range(0, num_survivors): current_generation[i] = survivors[i] for i in range(0, num_survivors): candidate = survivors[i] for j in range(0, num_offspring): index = num_survivors + j * num_survivors + i current_generation[index] = clone_model(candidate, random.random() * 0.1) inputs = random_sample(sample_size, N) coherence_models = build_coherence_models(inputs, scratch) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) # while random.random() < 0.5: # if last_score == top_score: # streak += 1 # else: # streak = 0 # if streak >= 4: # inputs = random_sample(sample_size, N) # coherence_models = build_coherence_models(inputs, scratch) # # compute_distances(inputs, distances, scratch) # # np.matmul(ones, distances, denominators) # for i in range(0, sample_size): # expected_outputs[i] = g(inputs[i]) # streak = 0 # expected_outputs = np.zeros((sample_size,)) # for i in range(0, sample_size): # expected_outputs[i] = g(inputs[i]) # index = random.randint(0, sample_size - 1) # update_sample(inputs, index) # expected_outputs[index] = g(inputs[index]) # update_distances(inputs, distances, index, scratch) # np.matmul(ones, distances, denominators) last_score = top_score if __name__ == "__main__": main()