probabilities/mutations4.py
2023-01-01 18:45:51 -05:00

591 lines
20 KiB
Python

import hashlib
import math
from matplotlib import offsetbox
import numpy as np
import random
from struct import pack, pack_into, unpack_from
import secrets
from numpy import hamming
N = 32
M = 2
def bit_at_index(buffer, index):
offset = (index >> 3) % len(buffer)
return buffer[offset] & (1 << (index & 0b111)) != 0
def count_one_bits(n):
return bin(n).count("1")
def hamming_distance(a, b, scratch):
np.logical_xor(a, b, scratch)
return sum(scratch)
def encode_f(f, buffer, offset=0):
(inverted, flips, child) = f
pack_into('I', buffer, offset, inverted)
offset += 4
for index in flips:
pack_into('I', buffer, offset, 0)
offset += 4
pack_into('I', buffer, offset, index)
offset += 4
if child is None:
pack_into('I', buffer, offset, 1)
offset += 4
return offset
(inverted, left, right) = child
pack_into('I', buffer, offset, 2 if not inverted else 3)
offset += 4
offset = encode_f(left, buffer, offset)
offset = encode_f(right, buffer, offset)
return offset
def generate_random_branch(p_mutation):
global N
p_add_indices = p_mutation * random.random()
p_add_children = p_mutation * random.random()
inverted = random.randint(0, 1)
indices = set()
children = []
# randomly add indices
while random.random() < p_add_indices and len(indices) < N:
available_indices = [i for i in range(0, N) if i not in indices]
if len(available_indices) == 1:
indices.add(available_indices[0])
continue
indices.add(available_indices[random.randint(0, len(available_indices) - 1)])
# randomly add children
while random.random() < p_add_children:
child_inverted = random.randint(0, 1)
left = generate_random_branch(p_add_children)
right = generate_random_branch(p_add_children)
children.append((child_inverted, left, right))
return (inverted, indices, children)
def mutate_f(f, p_mutation):
global N
(inverted, indices, children) = f
mutated_indices = set(indices)
mutated_children = children[:]
p_invert = p_mutation * random.random()
p_drop_indices = p_mutation * random.random()
p_add_indices = p_mutation * random.random()
p_drop_children = p_mutation * random.random()
p_mutate_child = p_mutation * random.random()
p_clone_child = p_mutation * random.random()
p_invert_child = p_mutation * random.random()
p_add_children = p_mutation * random.random()
# randomly invert
if random.random() < p_invert:
inverted ^= 1
# randomly drop indices
while random.random() < p_drop_indices and len(mutated_indices) > 0:
mutated_indices.pop()
# randomly add indices
while random.random() < p_add_indices and len(mutated_indices) < N:
available_indices = [i for i in range(0, N) if i not in mutated_indices]
if len(available_indices) == 1:
mutated_indices.add(available_indices[0])
continue
mutated_indices.add(available_indices[random.randint(0, len(available_indices) - 1)])
# randomly drop children
while random.random() < p_drop_children and len(mutated_children) > 0:
if len(mutated_children) == 1:
del mutated_children[0]
break
del mutated_children[random.randint(0, len(mutated_children) - 1)]
# randomly clone children
while random.random() < p_clone_child and len(mutated_children) > 0:
index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1)
(child_inverted, left, right) = mutated_children[index]
if random.random() < p_invert_child:
child_inverted ^= 1
clone = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation))
mutated_children.append(clone)
# randomly mutate children
while random.random() < p_mutate_child and len(mutated_children) > 0:
index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1)
(child_inverted, left, right) = mutated_children[index]
if random.random() < p_invert_child:
child_inverted ^= 1
mutated_children[index] = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation))
# randomly add children
while random.random() < p_add_children:
child_inverted = random.randint(0, 1)
left = generate_random_branch(p_mutation)
right = generate_random_branch(p_mutation)
mutated_children.append((child_inverted, left, right))
return (inverted, mutated_indices, mutated_children)
def decode_f(buffer, mutate = False, offset = 0, skip_invert = False):
global N
inverted = 0
if not skip_invert:
[inverted] = unpack_from('I', buffer, offset)
offset += 4
# random invert
if mutate and random.random() < 0.01:
inverted ^= 1
inverted &= 0b1
flips = set()
# random add flip
while mutate and random.random() < 0.5 and len(flips) < N:
available_indices = [i for i in range(0, N) if i not in flips]
if len(available_indices) == 1:
flips.add(available_indices[0])
continue
flips.add(available_indices[random.randint(0, len(available_indices) - 1)])
while offset < len(buffer):
# random create branch
if mutate and random.random() < 0.01:
gate_inverted = random.randint(0, 1)
left = generate_random_branch()
(offset, right) = decode_f(buffer, mutate, offset, True)
return (offset, (inverted, flips, (gate_inverted, left, right)))
[opcode] = unpack_from('I', buffer, offset)
offset += 4
opcode &= 0b11
if opcode == 0:
[index] = unpack_from('I', buffer, offset)
offset += 4
# random skip flip
if mutate and random.random() < 0.01:
continue
if index in flips:
flips.remove(index)
else:
flips.add(index)
elif opcode == 1:
return (offset, (inverted, flips, None))
else:
(offset, left) = decode_f(buffer, mutate, offset)
(offset, right) = decode_f(buffer, mutate, offset)
gate_inverted = 0 if opcode == 2 else 1
# random invert
if mutate and random.random() < 0.01:
gate_inverted ^= 1
# random skip branch
if mutate and random.random() < 0.01:
return (offset, (inverted, flips, None))
return (offset, (inverted, flips, (gate_inverted, left, right)))
return (offset, (inverted, [], None))
def generate_program(model, output_var='output'):
global N, M
(constant, indices, child) = model
statement = 'multiply(' + np.array2string(indices, separator=',') + ', x, temp)\n\t'
statement += output_var + '=' + str(constant) + '+sum(temp)\n\t'
if not child is None:
left_output = output_var + '0'
right_output = output_var + '1'
(left, right) = child
statement += generate_program(left, left_output)
statement += generate_program(right, right_output)
statement += output_var + '+=' + left_output + '*' + right_output + '\n\t'
statement += output_var + '%=' + str(M) + '\n\t'
return statement
def compile(model):
program = 'def f(x, temp):\n\t' + generate_program(model) + 'return output'
scope = {'multiply': np.multiply, 'sum': np.sum}
exec(program, scope)
return scope['f']
def evaluate(model, x, value = 0):
(inverted, indices, children) = model
for i in indices:
if bit_at_index(x, i) != 0:
value ^= 1
for child in children:
(child_inverted, left, right) = child
left = evaluate(left, x)
right = evaluate(right, x)
if left & right != child_inverted:
value ^= 1
if inverted:
value ^= 1
return value
def encode(v):
byte_values = []
for i in range(0, math.ceil(N / 8)):
x = 0
for j in range(0, 8):
index = i * 8 + j
x <<= 1
x |= int(v[index])
byte_values.append(x)
return bytearray(x)
def sha(v):
global M
x = encode(v)
m = hashlib.sha256()
m.update(x)
result = m.digest()
return result[0] % M
def xor(x):
num_one_bits = 0
for n in x:
num_one_bits += count_one_bits(n)
return num_one_bits % 2
def random_sample(m, n):
inputs = np.zeros((m, n))
for i in range(0, m):
for j in range(0, n):
inputs[i][j] = random.randint(0, 1)
return inputs
def update_sample(sample, index):
global N
for j in range(0, N):
sample[index][j] = random.randint(0, 1)
def coherence(inputs, outputs, scratch):
coherences = []
for i in range(0, len(inputs)):
x_a = inputs[i]
y_a = outputs[i]
numerator = 0
denominator = 0
for j in range(0, len(inputs)):
if i == j:
continue
x_b = inputs[j]
y_b = outputs[j]
distance = hamming_distance(x_a, x_b, scratch)
weight = 1.0 / (2 ** distance)
denominator += weight
if y_a == y_b:
numerator += weight
coherence = numerator / denominator if denominator > 0 else 0
coherences.append(coherence)
return sum(coherences) / len(coherences)
def build_coherence_models(inputs, scratch):
coherence_models = []
for i in range(0, len(inputs)):
x_a = inputs[i]
distances = [hamming_distance(x_a, inputs[j], scratch) for j in range(0, len(inputs))]
indices = sorted(range(len(distances)), key=lambda i: distances[i])
lowest = -1
denominator = 0
components = []
for index in range(0, len(indices)):
j = indices[index]
if distances[j] == 0:
continue
if lowest < 0:
lowest = distances[j]
distance = distances[j] - lowest
if distance >= 8:
break
weight = 2 ** -distance
denominator += weight
components.append((weight, j))
coherence_models.append((denominator, components))
return coherence_models
def fast_coherence(coherence_models, outputs):
coherences = []
for i in range(0, len(coherence_models)):
(denominator, components) = coherence_models[i]
numerator = 0
for component in components:
(weight, j) = component
if outputs[i] == outputs[j]:
numerator += weight
coherence = numerator / denominator if denominator > 0 else 0
coherences.append(coherence)
return sum(coherences) / len(coherences)
def score(f, sample, distances):
return coherence([(x, f(x) ^ y) for (x, y) in sample], distances)
def compute_distances(inputs, distances, scratch):
for i in range(0, len(inputs)):
a = inputs[i]
for j in range(i, len(inputs)):
if i == j:
distances[i][j] = 0
continue
b = inputs[j]
distance = 2 ** -hamming_distance(a, b, scratch)
distances[i][j] = distance
distances[j][i] = distance
def update_distances(inputs, distances, i, scratch):
a = inputs[i]
for j in range(0, len(inputs)):
if i == j:
distances[i][j] = 0
continue
b = inputs[j]
distance = 2 ** -hamming_distance(a, b, scratch)
distances[i][j] = distance
distances[j][i] = distance
def clone_model(model, p_mutation):
global N, M
clone = model[:]
p_insert_node = p_mutation * random.random()
i = 0
while i < len(clone):
(bias, op, indices, (p_modify, p_bias, p_index)) = clone[i]
p_modify_node = p_modify
if random.random() < p_modify_node:
p_modify += 0.01
p_add_index = p_index
p_modify_bias = p_bias
indices = indices.copy()
if random.random() < p_modify_bias:
p_bias += 0.01
bias += random.randint(0, M - 1)
bias %= M
else:
p_bias -= 0.01
for index in range(0, N + i):
if random.random() < p_add_index:
p_index += 0.01
if index in indices:
indices.remove(index)
else:
indices.add(index)
else:
p_index -= 0.01
else:
p_modify -= 0.01
p_modify = min(max(0.01, p_modify), 0.99)
p_bias = min(max(0.01, p_bias), 0.99)
p_index = min(max(0.01, p_index), 0.99)
clone[i] = (bias, op, indices, (p_modify, p_bias, p_index))
i += 1
if random.random() < p_insert_node:
i = random.randint(0, len(clone))
clone.insert(i, random_node(N + i - 1, p_mutation))
for j in range(i + 1, len(clone)):
(bias, op, indices, p) = clone[j]
modified_indices = set()
for index in indices:
if index < N:
modified_indices.add(index)
continue
shifted_index = index - N
if shifted_index == i:
if random.randint(0, 1) == 0:
modified_indices.add(index)
else:
modified_indices.add(index + 1)
if shifted_index > i:
modified_indices.add(index + 1)
else:
modified_indices.add(index)
clone[j] = (bias, op, modified_indices, p)
return clone
def random_node(max_index, p_mutation):
global N
bias = random.randint(0, M - 1)
op = random.randint(0, 1)
p_modify = random.random()
p_bias = random.random()
p_index = random.random()
indices = set()
indices.add(random.randint(0, max_index))
p_add_index = p_mutation * random.random()
for index in range(0, max_index):
if random.random() < p_add_index:
indices.add(index)
return (bias, op, indices, (p_modify, p_bias, p_index))
def null_candidate():
global N
return []
def encode_tree(tree_model):
stack = [tree_model]
node_indices = {}
index = 0
while len(stack) > 0:
node = stack.pop()
node_indices[node] = index
index += 1
(p, bias, value) = node
if isinstance(value, int):
continue
(left, right) = value
stack.append(left)
stack.append(right)
length = index
stack = [tree_model]
serialized_model = []
while len(stack) > 0:
node = stack.pop()
(p, bias, value) = node
serialized_model.insert(0, )
def eval_model(model, buffer, x):
global N, M
for i in range(0, len(model)):
(bias, op, indices, _) = model[i]
value = op
for index in indices:
if index >= N + i:
print('This should not happen')
if op == 1:
value *= x[index] if index < N else buffer[index - N]
value %= M
else:
value += x[index] if index < N else buffer[index - N]
value %= M
value += bias
value %= M
if i == len(model) - 1:
return value
else:
buffer[i] = value
return 0
def size(model):
return len(model)
def main():
global N, M
epochs = 10000
num_survivors = 100
num_offspring = 10
num_candidates = num_survivors + num_survivors * num_offspring
sample_size = 64
eval_size = 100
max_nodes = 65536
p_mutation = 0.5
g = sha
current_generation = [null_candidate() for _ in range(0, num_candidates)]
distances = np.zeros((sample_size, sample_size))
output_equality = np.zeros((sample_size, sample_size))
inputs = random_sample(sample_size, N)
scratch = np.zeros(N,)
# compute_distances(inputs, distances, scratch)
expected_outputs = np.zeros((sample_size,))
for i in range(0, sample_size):
expected_outputs[i] = g(inputs[i])
outputs = np.zeros((sample_size,))
output_xor = np.zeros((sample_size,))
ones = np.ones((sample_size,))
numerators = np.zeros((sample_size,))
denominators = np.zeros((sample_size,))
coherences = np.zeros((sample_size,))
np.matmul(ones, distances, denominators)
scores = np.zeros((num_candidates,))
eval_buffer = np.zeros((max_nodes,))
max_score = 0
last_score = 0
streak = 0
coherence_models = build_coherence_models(inputs, scratch)
for epoch in range(0, epochs):
for i in range(0, num_candidates):
candidate = current_generation[i]
for j in range(0, sample_size):
outputs[j] = eval_model(candidate, eval_buffer, inputs[j])
np.subtract(outputs, expected_outputs, output_xor)
np.mod(output_xor, M, output_xor)
# for p in range(0, sample_size):
# for q in range(0, sample_size):
# m = int(output_xor[p])
# n = int(output_xor[q])
# distance = abs(m - n)
# if distance > M / 2:
# distance = M - distance
# distance /= (M / 2)
# distance **= 2
# output_equality[p][q] = distance
# # output_equality[p][q] = 1 if m == n else 0
# np.multiply(output_equality, distances, output_equality)
# np.matmul(ones, output_equality, numerators)
# np.divide(numerators, denominators, coherences)
# score = np.average(coherences)
score = fast_coherence(coherence_models, output_xor)
# if random.random() < 0.1:
# check = coherence(inputs, output_xor, scratch)
# if check - score > 1e-3:
# print('not equal')
scores[i] = score
top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:]
survivors = [current_generation[index] for index in top_n]
# f = lambda x: evaluate(current_generation[0], x)
# correct = 0
# for i in range(0, eval_size):
# x = random_input()
# if f(x) == g(x):
# correct += 1
top_score = scores[top_n[-1]]
print(epoch, top_score, size(survivors[-1]))
if top_score <= max_score:
p_mutation += 0.01
else:
p_mutation = 0.5
max_score = top_score
for i in range(0, num_survivors):
current_generation[i] = survivors[i]
for i in range(0, num_survivors):
candidate = survivors[i]
for j in range(0, num_offspring):
index = num_survivors + j * num_survivors + i
current_generation[index] = clone_model(candidate, random.random() * 0.1)
inputs = random_sample(sample_size, N)
coherence_models = build_coherence_models(inputs, scratch)
for i in range(0, sample_size):
expected_outputs[i] = g(inputs[i])
# while random.random() < 0.5:
# if last_score == top_score:
# streak += 1
# else:
# streak = 0
# if streak >= 4:
# inputs = random_sample(sample_size, N)
# coherence_models = build_coherence_models(inputs, scratch)
# # compute_distances(inputs, distances, scratch)
# # np.matmul(ones, distances, denominators)
# for i in range(0, sample_size):
# expected_outputs[i] = g(inputs[i])
# streak = 0
# expected_outputs = np.zeros((sample_size,))
# for i in range(0, sample_size):
# expected_outputs[i] = g(inputs[i])
# index = random.randint(0, sample_size - 1)
# update_sample(inputs, index)
# expected_outputs[index] = g(inputs[index])
# update_distances(inputs, distances, index, scratch)
# np.matmul(ones, distances, denominators)
last_score = top_score
if __name__ == "__main__":
main()