import hashlib import math from matplotlib import offsetbox import numpy as np import random from struct import pack, pack_into, unpack_from import secrets from numpy import hamming N = 32 M = 2 def bit_at_index(buffer, index): offset = (index >> 3) % len(buffer) return buffer[offset] & (1 << (index & 0b111)) != 0 def count_one_bits(n): return bin(n).count("1") def hamming_distance(a, b, scratch): np.logical_xor(a, b, scratch) return sum(scratch) def encode_f(f, buffer, offset=0): (inverted, flips, child) = f pack_into('I', buffer, offset, inverted) offset += 4 for index in flips: pack_into('I', buffer, offset, 0) offset += 4 pack_into('I', buffer, offset, index) offset += 4 if child is None: pack_into('I', buffer, offset, 1) offset += 4 return offset (inverted, left, right) = child pack_into('I', buffer, offset, 2 if not inverted else 3) offset += 4 offset = encode_f(left, buffer, offset) offset = encode_f(right, buffer, offset) return offset def generate_random_branch(p_mutation): global N p_add_indices = p_mutation * random.random() p_add_children = p_mutation * random.random() inverted = random.randint(0, 1) indices = set() children = [] # randomly add indices while random.random() < p_add_indices and len(indices) < N: available_indices = [i for i in range(0, N) if i not in indices] if len(available_indices) == 1: indices.add(available_indices[0]) continue indices.add(available_indices[random.randint(0, len(available_indices) - 1)]) # randomly add children while random.random() < p_add_children: child_inverted = random.randint(0, 1) left = generate_random_branch(p_add_children) right = generate_random_branch(p_add_children) children.append((child_inverted, left, right)) return (inverted, indices, children) def mutate_f(f, p_mutation): global N (inverted, indices, children) = f mutated_indices = set(indices) mutated_children = children[:] p_invert = p_mutation * random.random() p_drop_indices = p_mutation * random.random() p_add_indices = p_mutation * random.random() p_drop_children = p_mutation * random.random() p_mutate_child = p_mutation * random.random() p_clone_child = p_mutation * random.random() p_invert_child = p_mutation * random.random() p_add_children = p_mutation * random.random() # randomly invert if random.random() < p_invert: inverted ^= 1 # randomly drop indices while random.random() < p_drop_indices and len(mutated_indices) > 0: mutated_indices.pop() # randomly add indices while random.random() < p_add_indices and len(mutated_indices) < N: available_indices = [i for i in range(0, N) if i not in mutated_indices] if len(available_indices) == 1: mutated_indices.add(available_indices[0]) continue mutated_indices.add(available_indices[random.randint(0, len(available_indices) - 1)]) # randomly drop children while random.random() < p_drop_children and len(mutated_children) > 0: if len(mutated_children) == 1: del mutated_children[0] break del mutated_children[random.randint(0, len(mutated_children) - 1)] # randomly clone children while random.random() < p_clone_child and len(mutated_children) > 0: index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1) (child_inverted, left, right) = mutated_children[index] if random.random() < p_invert_child: child_inverted ^= 1 clone = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation)) mutated_children.append(clone) # randomly mutate children while random.random() < p_mutate_child and len(mutated_children) > 0: index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1) (child_inverted, left, right) = mutated_children[index] if random.random() < p_invert_child: child_inverted ^= 1 mutated_children[index] = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation)) # randomly add children while random.random() < p_add_children: child_inverted = random.randint(0, 1) left = generate_random_branch(p_mutation) right = generate_random_branch(p_mutation) mutated_children.append((child_inverted, left, right)) return (inverted, mutated_indices, mutated_children) def decode_f(buffer, mutate = False, offset = 0, skip_invert = False): global N inverted = 0 if not skip_invert: [inverted] = unpack_from('I', buffer, offset) offset += 4 # random invert if mutate and random.random() < 0.01: inverted ^= 1 inverted &= 0b1 flips = set() # random add flip while mutate and random.random() < 0.5 and len(flips) < N: available_indices = [i for i in range(0, N) if i not in flips] if len(available_indices) == 1: flips.add(available_indices[0]) continue flips.add(available_indices[random.randint(0, len(available_indices) - 1)]) while offset < len(buffer): # random create branch if mutate and random.random() < 0.01: gate_inverted = random.randint(0, 1) left = generate_random_branch() (offset, right) = decode_f(buffer, mutate, offset, True) return (offset, (inverted, flips, (gate_inverted, left, right))) [opcode] = unpack_from('I', buffer, offset) offset += 4 opcode &= 0b11 if opcode == 0: [index] = unpack_from('I', buffer, offset) offset += 4 # random skip flip if mutate and random.random() < 0.01: continue if index in flips: flips.remove(index) else: flips.add(index) elif opcode == 1: return (offset, (inverted, flips, None)) else: (offset, left) = decode_f(buffer, mutate, offset) (offset, right) = decode_f(buffer, mutate, offset) gate_inverted = 0 if opcode == 2 else 1 # random invert if mutate and random.random() < 0.01: gate_inverted ^= 1 # random skip branch if mutate and random.random() < 0.01: return (offset, (inverted, flips, None)) return (offset, (inverted, flips, (gate_inverted, left, right))) return (offset, (inverted, [], None)) def generate_program(model, output_var='output'): global N, M (constant, indices, child) = model statement = 'multiply(' + np.array2string(indices, separator=',') + ', x, temp)\n\t' statement += output_var + '=' + str(constant) + '+sum(temp)\n\t' if not child is None: left_output = output_var + '0' right_output = output_var + '1' (left, right) = child statement += generate_program(left, left_output) statement += generate_program(right, right_output) statement += output_var + '+=' + left_output + '*' + right_output + '\n\t' statement += output_var + '%=' + str(M) + '\n\t' return statement def compile(model): program = 'def f(x, temp):\n\t' + generate_program(model) + 'return output' scope = {'multiply': np.multiply, 'sum': np.sum} exec(program, scope) return scope['f'] def evaluate(model, x, value = 0): (inverted, indices, children) = model for i in indices: if bit_at_index(x, i) != 0: value ^= 1 for child in children: (child_inverted, left, right) = child left = evaluate(left, x) right = evaluate(right, x) if left & right != child_inverted: value ^= 1 if inverted: value ^= 1 return value def encode(v): byte_values = [] for i in range(0, math.ceil(N / 8)): x = 0 for j in range(0, 8): index = i * 8 + j x <<= 1 x |= int(v[index]) byte_values.append(x) return bytearray(x) def sha(v): global M x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result[0] % M def xor(x): num_one_bits = 0 for n in x: num_one_bits += count_one_bits(n) return num_one_bits % 2 def random_sample(m, n): inputs = np.zeros((m, n)) for i in range(0, m): for j in range(0, n): inputs[i][j] = random.randint(0, 1) return inputs def update_sample(sample, index): global N for j in range(0, N): sample[index][j] = random.randint(0, 1) def coherence(inputs, outputs, scratch): coherences = [] for i in range(0, len(inputs)): x_a = inputs[i] y_a = outputs[i] numerator = 0 denominator = 0 for j in range(0, len(inputs)): if i == j: continue x_b = inputs[j] y_b = outputs[j] distance = hamming_distance(x_a, x_b, scratch) weight = 1.0 / (2 ** distance) denominator += weight if y_a == y_b: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def build_coherence_models(inputs, scratch): coherence_models = [] for i in range(0, len(inputs)): x_a = inputs[i] distances = [hamming_distance(x_a, inputs[j], scratch) for j in range(0, len(inputs))] indices = sorted(range(len(distances)), key=lambda i: distances[i]) lowest = -1 denominator = 0 components = [] for index in range(0, len(indices)): j = indices[index] if distances[j] == 0: continue if lowest < 0: lowest = distances[j] distance = distances[j] - lowest if distance >= 8: break weight = 2 ** -distance denominator += weight components.append((weight, j)) coherence_models.append((denominator, components)) return coherence_models def fast_coherence(coherence_models, outputs): coherences = [] for i in range(0, len(coherence_models)): (denominator, components) = coherence_models[i] numerator = 0 for component in components: (weight, j) = component if outputs[i] == outputs[j]: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def score(f, sample, distances): return coherence([(x, f(x) ^ y) for (x, y) in sample], distances) def compute_distances(inputs, distances, scratch): for i in range(0, len(inputs)): a = inputs[i] for j in range(i, len(inputs)): if i == j: distances[i][j] = 0 continue b = inputs[j] distance = 2 ** -hamming_distance(a, b, scratch) distances[i][j] = distance distances[j][i] = distance def update_distances(inputs, distances, i, scratch): a = inputs[i] for j in range(0, len(inputs)): if i == j: distances[i][j] = 0 continue b = inputs[j] distance = 2 ** -hamming_distance(a, b, scratch) distances[i][j] = distance distances[j][i] = distance def evaluate_sample(model, sample, output): stack = [model] (_, _, _, root_scratch, _) = model while len(stack) > 0: layer = stack.pop() (inverted, xors, child, scratch, touched) = layer if child is None: np.matmul(sample, xors, scratch) np.mod(scratch, 2, scratch) if inverted == 1: np.logical_xor(1, scratch, scratch) touched[0] = 1 else: (child_inverted, left, right) = child (_, _, _, left_scratch, left_touched) = left (_, _, _, right_scratch, right_touched) = right if left_touched[0] and right_touched[0]: np.multiply(left_scratch, right_scratch, output) np.matmul(sample, xors, scratch) np.mod(scratch, 2, scratch) if inverted: np.logical_xor(scratch, 1, scratch) if child_inverted: np.logical_xor(output, 1, output) np.logical_xor(scratch, output, scratch) touched[0] = 1 else: stack.insert(0, layer) stack.insert(0, left) stack.insert(0, right) np.copyto(output, root_scratch) reset_model(model) def reset_model(model): stack = [model] while len(stack) > 0: layer = stack.pop() (_, _, child, _, touched) = layer touched[0] = 0 if not child is None: (_, left, right) = child stack.append(left) stack.append(right) def clone_model(model, p_mutation): global N, M p_constant = p_mutation * random.random() p_flip = p_mutation * random.random() p_add_child = p_mutation * random.random() p_drop_child = p_mutation * random.random() (constant, xors, child) = model if random.random() < p_constant: constant += random.randint(0, M - 1) constant %= M clone_xors = np.zeros((N,)) np.copyto(clone_xors, xors) for i in range(0, N): if random.random() < p_flip: offset = 1 if M == 2 else random.randint(1, M - 1) clone_xors[i] += offset clone_xors[i] %= M if child is None: if random.random() < p_add_child: left = random_child(p_mutation) right = random_child(p_mutation) return (constant, clone_xors, (left, right)) return (constant, clone_xors, None) if random.random() < p_drop_child: return (constant, clone_xors, None) (left, right) = child clone_left = clone_model(left, p_mutation) clone_right = clone_model(right, p_mutation) return (constant, clone_xors, (clone_left, clone_right)) def random_child(p_mutation): global N, M constant = random.randint(0, M - 1) xors = np.zeros((N,)) p_flip = p_mutation * random.random() p_child = p_mutation * random.random() index = random.randint(0, N - 1) xors[index] = 1 if M == 2 else random.randint(1, M - 1) for i in range(0, N): if i != index and random.random() < p_flip: xors[i] = 1 if M == 2 else random.randint(1, M - 1) # if random.random() < p_child: # left = random_child(p_mutation * random.random()) # right = random_child(p_mutation * random.random()) # return (constant, xors, (left, right)) return (constant, xors, None) def null_candidate(): global N return (0, np.zeros((N,)), None) def size(model): (_, xors, child) = model xor_size = np.sum(xors) if not child is None: (left, right) = child return xor_size + size(left) * size(right) return xor_size def main(): global N, M epochs = 10000 num_survivors = 100 num_offspring = 10 num_candidates = num_survivors + num_survivors * num_offspring sample_size = 128 eval_size = 100 p_mutation = 0.5 g = sha current_generation = [null_candidate() for _ in range(0, num_candidates)] distances = np.zeros((sample_size, sample_size)) output_equality = np.zeros((sample_size, sample_size)) inputs = random_sample(sample_size, N) scratch = np.zeros(N,) # compute_distances(inputs, distances, scratch) expected_outputs = np.zeros((sample_size,)) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) outputs = np.zeros((sample_size,)) output_xor = np.zeros((sample_size,)) ones = np.ones((sample_size,)) numerators = np.zeros((sample_size,)) denominators = np.zeros((sample_size,)) coherences = np.zeros((sample_size,)) np.matmul(ones, distances, denominators) scores = np.zeros((num_candidates,)) max_score = 0 last_score = 0 streak = 0 coherence_models = build_coherence_models(inputs, scratch) for epoch in range(0, epochs): for i in range(0, num_candidates): candidate = current_generation[i] f = compile(candidate) for j in range(0, sample_size): outputs[j] = f(inputs[j], scratch) np.subtract(outputs, expected_outputs, output_xor) np.mod(output_xor, M, output_xor) # for p in range(0, sample_size): # for q in range(0, sample_size): # m = int(output_xor[p]) # n = int(output_xor[q]) # distance = abs(m - n) # if distance > M / 2: # distance = M - distance # distance /= (M / 2) # distance **= 2 # output_equality[p][q] = distance # # output_equality[p][q] = 1 if m == n else 0 # np.multiply(output_equality, distances, output_equality) # np.matmul(ones, output_equality, numerators) # np.divide(numerators, denominators, coherences) # score = np.average(coherences) score = fast_coherence(coherence_models, output_xor) # if random.random() < 0.1: # check = coherence(inputs, output_xor, scratch) # if check - score > 1e-3: # print('not equal') scores[i] = score top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:] survivors = [current_generation[index] for index in top_n] # f = lambda x: evaluate(current_generation[0], x) # correct = 0 # for i in range(0, eval_size): # x = random_input() # if f(x) == g(x): # correct += 1 top_score = scores[top_n[-1]] print(epoch, top_score, size(survivors[-1])) if top_score <= max_score: p_mutation += 0.01 else: p_mutation = 0.5 max_score = top_score for i in range(0, num_survivors): current_generation[i] = survivors[i] for i in range(0, num_survivors): candidate = survivors[i] for j in range(0, num_offspring): index = num_survivors + j * num_survivors + i current_generation[index] = clone_model(candidate, random.random()) # inputs = random_sample(sample_size, N) # coherence_models = build_coherence_models(inputs, scratch) # for i in range(0, sample_size): # expected_outputs[i] = g(inputs[i]) # while random.random() < 0.5: if last_score == top_score: streak += 1 else: streak = 0 if streak >= 4: inputs = random_sample(sample_size, N) coherence_models = build_coherence_models(inputs, scratch) for i in range(0, sample_size): expected_outputs[i] = g(inputs[i]) # inputs = random_sample(sample_size, N) # coherence_models = build_coherence_models(inputs, scratch) # # compute_distances(inputs, distances, scratch) # # np.matmul(ones, distances, denominators) # for i in range(0, sample_size): # expected_outputs[i] = g(inputs[i]) # streak = 0 # expected_outputs = np.zeros((sample_size,)) # for i in range(0, sample_size): # expected_outputs[i] = g(inputs[i]) # index = random.randint(0, sample_size - 1) # update_sample(inputs, index) # expected_outputs[index] = g(inputs[index]) # update_distances(inputs, distances, index, scratch) # np.matmul(ones, distances, denominators) last_score = top_score if __name__ == "__main__": main()