probabilities/mutations.py

511 lines
18 KiB
Python
Raw Normal View History

2023-01-01 23:45:51 +00:00
import hashlib
import math
import numpy as np
import random
from struct import pack, pack_into, unpack_from
import secrets
from numpy import hamming
N = 8
def bit_at_index(buffer, index):
offset = (index >> 3) % len(buffer)
return buffer[offset] & (1 << (index & 0b111)) != 0
def count_one_bits(n):
return bin(n).count("1")
def hamming_distance(a, b, scratch):
np.logical_xor(a, b, scratch)
return sum(scratch)
def encode_f(f, buffer, offset=0):
(inverted, flips, child) = f
pack_into('I', buffer, offset, inverted)
offset += 4
for index in flips:
pack_into('I', buffer, offset, 0)
offset += 4
pack_into('I', buffer, offset, index)
offset += 4
if child is None:
pack_into('I', buffer, offset, 1)
offset += 4
return offset
(inverted, left, right) = child
pack_into('I', buffer, offset, 2 if not inverted else 3)
offset += 4
offset = encode_f(left, buffer, offset)
offset = encode_f(right, buffer, offset)
return offset
def generate_random_branch(p_mutation):
global N
p_add_indices = p_mutation * random.random()
p_add_children = p_mutation * random.random()
inverted = random.randint(0, 1)
indices = set()
children = []
# randomly add indices
while random.random() < p_add_indices and len(indices) < N:
available_indices = [i for i in range(0, N) if i not in indices]
if len(available_indices) == 1:
indices.add(available_indices[0])
continue
indices.add(available_indices[random.randint(0, len(available_indices) - 1)])
# randomly add children
while random.random() < p_add_children:
child_inverted = random.randint(0, 1)
left = generate_random_branch(p_add_children)
right = generate_random_branch(p_add_children)
children.append((child_inverted, left, right))
return (inverted, indices, children)
def mutate_f(f, p_mutation):
global N
(inverted, indices, children) = f
mutated_indices = set(indices)
mutated_children = children[:]
p_invert = p_mutation * random.random()
p_drop_indices = p_mutation * random.random()
p_add_indices = p_mutation * random.random()
p_drop_children = p_mutation * random.random()
p_mutate_child = p_mutation * random.random()
p_clone_child = p_mutation * random.random()
p_invert_child = p_mutation * random.random()
p_add_children = p_mutation * random.random()
# randomly invert
if random.random() < p_invert:
inverted ^= 1
# randomly drop indices
while random.random() < p_drop_indices and len(mutated_indices) > 0:
mutated_indices.pop()
# randomly add indices
while random.random() < p_add_indices and len(mutated_indices) < N:
available_indices = [i for i in range(0, N) if i not in mutated_indices]
if len(available_indices) == 1:
mutated_indices.add(available_indices[0])
continue
mutated_indices.add(available_indices[random.randint(0, len(available_indices) - 1)])
# randomly drop children
while random.random() < p_drop_children and len(mutated_children) > 0:
if len(mutated_children) == 1:
del mutated_children[0]
break
del mutated_children[random.randint(0, len(mutated_children) - 1)]
# randomly clone children
while random.random() < p_clone_child and len(mutated_children) > 0:
index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1)
(child_inverted, left, right) = mutated_children[index]
if random.random() < p_invert_child:
child_inverted ^= 1
clone = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation))
mutated_children.append(clone)
# randomly mutate children
while random.random() < p_mutate_child and len(mutated_children) > 0:
index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1)
(child_inverted, left, right) = mutated_children[index]
if random.random() < p_invert_child:
child_inverted ^= 1
mutated_children[index] = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation))
# randomly add children
while random.random() < p_add_children:
child_inverted = random.randint(0, 1)
left = generate_random_branch(p_mutation)
right = generate_random_branch(p_mutation)
mutated_children.append((child_inverted, left, right))
return (inverted, mutated_indices, mutated_children)
def decode_f(buffer, mutate = False, offset = 0, skip_invert = False):
global N
inverted = 0
if not skip_invert:
[inverted] = unpack_from('I', buffer, offset)
offset += 4
# random invert
if mutate and random.random() < 0.01:
inverted ^= 1
inverted &= 0b1
flips = set()
# random add flip
while mutate and random.random() < 0.5 and len(flips) < N:
available_indices = [i for i in range(0, N) if i not in flips]
if len(available_indices) == 1:
flips.add(available_indices[0])
continue
flips.add(available_indices[random.randint(0, len(available_indices) - 1)])
while offset < len(buffer):
# random create branch
if mutate and random.random() < 0.01:
gate_inverted = random.randint(0, 1)
left = generate_random_branch()
(offset, right) = decode_f(buffer, mutate, offset, True)
return (offset, (inverted, flips, (gate_inverted, left, right)))
[opcode] = unpack_from('I', buffer, offset)
offset += 4
opcode &= 0b11
if opcode == 0:
[index] = unpack_from('I', buffer, offset)
offset += 4
# random skip flip
if mutate and random.random() < 0.01:
continue
if index in flips:
flips.remove(index)
else:
flips.add(index)
elif opcode == 1:
return (offset, (inverted, flips, None))
else:
(offset, left) = decode_f(buffer, mutate, offset)
(offset, right) = decode_f(buffer, mutate, offset)
gate_inverted = 0 if opcode == 2 else 1
# random invert
if mutate and random.random() < 0.01:
gate_inverted ^= 1
# random skip branch
if mutate and random.random() < 0.01:
return (offset, (inverted, flips, None))
return (offset, (inverted, flips, (gate_inverted, left, right)))
return (offset, (inverted, [], None))
def generate_program(f):
statement = ""
(inverted, indices, children) = f
if inverted:
statement += "1^"
statement += "("
for i in indices:
statement += "(x[" + str(i) + ">>3]&(1<<(" + str(i) + "&0b111))!=0)^"
for child in children:
(gate_inverted, left, right) = child
if gate_inverted:
statement += "1^"
statement += "((" + generate_program(left) + ")&(" + generate_program(right) + "))^"
statement += "0)"
return statement
def compile_f(f):
program = 'def f(x):\n\treturn ' + generate_program(f)
scope = {}
exec(program, scope)
return scope['f']
def evaluate(model, x, value = 0):
(inverted, indices, children) = model
for i in indices:
if bit_at_index(x, i) != 0:
value ^= 1
for child in children:
(child_inverted, left, right) = child
left = evaluate(left, x)
right = evaluate(right, x)
if left & right != child_inverted:
value ^= 1
if inverted:
value ^= 1
return value
def encode(v):
byte_values = []
for i in range(0, math.ceil(N / 8)):
x = 0
for j in range(0, 8):
index = i * 8 + j
x <<= 1
x |= int(v[index])
byte_values.append(x)
return bytearray(x)
def sha(v):
x = encode(v)
m = hashlib.sha256()
m.update(x)
result = m.digest()
return result[0] & 0b1
def xor(x):
num_one_bits = 0
for n in x:
num_one_bits += count_one_bits(n)
return num_one_bits % 2
def random_sample(m, n):
inputs = np.zeros((m, n))
for i in range(0, m):
for j in range(0, n):
inputs[i][j] = random.randint(0, 1)
return inputs
def update_sample(sample, index):
global N
for j in range(0, N):
sample[index][j] = random.randint(0, 1)
def coherence(inputs, outputs):
coherences = []
for i in range(0, len(inputs)):
x_a = inputs[i]
y_a = outputs[i]
numerator = 0
denominator = 0
for j in range(0, len(inputs)):
if i == j:
continue
x_b = inputs[j]
y_b = outputs[j]
distance = hamming_distance(x_a, x_b)
weight = 1.0 / (2 ** distance)
denominator += weight
if y_a == y_b:
numerator += weight
coherence = numerator / denominator if denominator > 0 else 0
coherences.append(coherence)
return sum(coherences) / len(coherences)
def score(f, sample, distances):
return coherence([(x, f(x) ^ y) for (x, y) in sample], distances)
def compute_distances(inputs, distances, scratch):
for i in range(0, len(inputs)):
a = inputs[i]
for j in range(i, len(inputs)):
if i == j:
distances[i][j] = 0
continue
b = inputs[j]
distance = 2 ** -hamming_distance(a, b, scratch)
distances[i][j] = distance
distances[j][i] = distance
def update_distances(inputs, distances, i, scratch):
a = inputs[i]
for j in range(0, len(inputs)):
if i == j:
distances[i][j] = 0
continue
b = inputs[j]
distance = 2 ** -hamming_distance(a, b, scratch)
distances[i][j] = distance
distances[j][i] = distance
def evaluate_sample(model, sample, output):
stack = [model]
(_, _, _, root_scratch, _) = model
while len(stack) > 0:
layer = stack.pop()
(inverted, xors, child, scratch, touched) = layer
if child is None:
np.matmul(sample, xors, scratch)
np.mod(scratch, 2, scratch)
if inverted == 1:
np.logical_xor(1, scratch, scratch)
touched[0] = 1
else:
(child_inverted, left, right) = child
(_, _, _, left_scratch, left_touched) = left
(_, _, _, right_scratch, right_touched) = right
if left_touched[0] and right_touched[0]:
np.multiply(left_scratch, right_scratch, output)
np.matmul(sample, xors, scratch)
np.mod(scratch, 2, scratch)
if inverted:
np.logical_xor(scratch, 1, scratch)
if child_inverted:
np.logical_xor(output, 1, output)
np.logical_xor(scratch, output, scratch)
touched[0] = 1
else:
stack.insert(0, layer)
stack.insert(0, left)
stack.insert(0, right)
np.copyto(output, root_scratch)
reset_model(model)
def reset_model(model):
stack = [model]
while len(stack) > 0:
layer = stack.pop()
(_, _, child, _, touched) = layer
touched[0] = 0
if not child is None:
(_, left, right) = child
stack.append(left)
stack.append(right)
def clone_model(model, p_mutation):
global N
p_invert = p_mutation * random.random()
p_invert_child = p_mutation * random.random()
p_flip = p_mutation * random.random()
p_add_child = p_mutation * random.random()
# p_drop_child = p_mutation * random.random() * 0.5
p_drop_child = 0
(inverted, xors, child, scratch, touched) = model
if random.random() < p_invert:
inverted ^= 1
clone_xors = np.zeros((N,))
np.copyto(clone_xors, xors)
for i in range(0, N):
if random.random() < p_flip:
clone_xors[i] = int(clone_xors[i]) ^ 1
clone_scratch = np.zeros(np.shape(scratch))
clone_touched = np.zeros(np.shape(touched))
if child is None:
if random.random() < p_add_child:
sample_size = len(scratch)
child_inverted = random.randint(0, 1)
left = random_child(sample_size, p_mutation)
right = random_child(sample_size, p_mutation)
return (inverted, clone_xors, (child_inverted, left, right), clone_scratch, clone_touched)
return (inverted, clone_xors, None, clone_scratch, clone_touched)
if random.random() < p_drop_child:
return (inverted, clone_xors, None, clone_scratch, clone_touched)
(child_inverted, left, right) = child
if random.random() < p_invert_child:
inverted ^= 1
clone_left = clone_model(left, p_mutation)
clone_right = clone_model(right, p_mutation)
return (inverted, clone_xors, (child_inverted, clone_left, clone_right), clone_scratch, clone_touched)
def random_child(sample_size, p_mutation):
global N
inverted = random.randint(0, 1)
xors = np.zeros((N,))
scratch = np.zeros((sample_size,))
touched = np.zeros((1,))
p_flip = p_mutation * random.random()
p_child = p_mutation * random.random()
index = random.randint(0, N - 1)
xors[index] = 1
for i in range(0, N):
if random.random() < p_flip:
xors[i] = 1
# if random.random() < p_child:
# child_inverted = random.randint(0, 1)
# left = random_child(sample_size, p_mutation * random.random())
# right = random_child(sample_size, p_mutation * random.random())
# return (inverted, xors, (child_inverted, left, right), scratch, touched)
return (inverted, xors, None, scratch, touched)
def size(model):
(_, xors, child, _, _) = model
xor_size = np.sum(xors)
if not child is None:
(_, left, right) = child
return xor_size + size(left) * size(right)
return xor_size
def null_candidate(sample_size):
global N
return (0, np.zeros((N,)), None, np.zeros((sample_size,)), np.zeros((1,)))
def main():
global N
epochs = 10000
num_survivors = 100
num_offspring = 10
num_candidates = num_survivors + num_survivors * num_offspring
sample_size = 32
eval_size = 100
p_mutation = 0.5
g = sha
current_generation = [null_candidate(sample_size) for _ in range(0, num_candidates)]
distances = np.zeros((sample_size, sample_size))
output_equality = np.zeros((sample_size, sample_size))
inputs = random_sample(sample_size, N)
scratch = np.zeros(N,)
compute_distances(inputs, distances, scratch)
expected_outputs = np.zeros((sample_size,))
for i in range(0, sample_size):
expected_outputs[i] = g(inputs[i])
outputs = np.zeros((sample_size,))
output_xor = np.zeros((sample_size,))
ones = np.ones((sample_size,))
numerators = np.zeros((sample_size,))
denominators = np.zeros((sample_size,))
coherences = np.zeros((sample_size,))
np.matmul(ones, distances, denominators)
scores = np.zeros((num_candidates,))
max_score = 0
last_score = 0
streak = 0
for epoch in range(0, epochs):
for i in range(0, num_candidates):
candidate = current_generation[i]
evaluate_sample(candidate, inputs, outputs)
np.logical_xor(outputs, expected_outputs, output_xor)
for p in range(0, sample_size):
for q in range(0, sample_size):
m = int(output_xor[p])
n = int(output_xor[q])
output_equality[p][q] = 1 ^ m ^ n
np.multiply(output_equality, distances, output_equality)
np.matmul(ones, output_equality, numerators)
np.divide(numerators, denominators, coherences)
score = np.average(coherences)
scores[i] = score
top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:]
survivors = [current_generation[index] for index in top_n]
# f = lambda x: evaluate(current_generation[0], x)
# correct = 0
# for i in range(0, eval_size):
# x = random_input()
# if f(x) == g(x):
# correct += 1
top_score = scores[top_n[-1]]
print(epoch, top_score, size(survivors[-1]))
if top_score <= max_score:
p_mutation += 0.01
else:
p_mutation = 0.5
max_score = top_score
for i in range(0, num_survivors):
current_generation[i] = survivors[i]
for i in range(0, num_survivors):
candidate = survivors[i]
for j in range(0, num_offspring):
index = num_survivors + j * num_survivors + i
current_generation[index] = clone_model(candidate, random.random())
# while random.random() < 0.5:
if last_score == top_score:
# streak += 1
# else:
# streak = 0
# if streak >= 4:
# streak = 0
inputs = random_sample(sample_size, N)
compute_distances(inputs, distances, scratch)
np.matmul(ones, distances, denominators)
for i in range(0, sample_size):
expected_outputs[i] = g(inputs[i])
# expected_outputs = np.zeros((sample_size,))
# for i in range(0, sample_size):
# expected_outputs[i] = g(inputs[i])
# index = random.randint(0, sample_size - 1)
# update_sample(inputs, index)
# expected_outputs[index] = g(inputs[index])
# update_distances(inputs, distances, index, scratch)
# np.matmul(ones, distances, denominators)
last_score = top_score
if __name__ == "__main__":
main()