511 lines
18 KiB
Python
511 lines
18 KiB
Python
|
import hashlib
|
||
|
import math
|
||
|
import numpy as np
|
||
|
import random
|
||
|
from struct import pack, pack_into, unpack_from
|
||
|
import secrets
|
||
|
|
||
|
from numpy import hamming
|
||
|
|
||
|
N = 8
|
||
|
|
||
|
def bit_at_index(buffer, index):
|
||
|
offset = (index >> 3) % len(buffer)
|
||
|
return buffer[offset] & (1 << (index & 0b111)) != 0
|
||
|
|
||
|
def count_one_bits(n):
|
||
|
return bin(n).count("1")
|
||
|
|
||
|
def hamming_distance(a, b, scratch):
|
||
|
np.logical_xor(a, b, scratch)
|
||
|
return sum(scratch)
|
||
|
|
||
|
def encode_f(f, buffer, offset=0):
|
||
|
(inverted, flips, child) = f
|
||
|
pack_into('I', buffer, offset, inverted)
|
||
|
offset += 4
|
||
|
for index in flips:
|
||
|
pack_into('I', buffer, offset, 0)
|
||
|
offset += 4
|
||
|
pack_into('I', buffer, offset, index)
|
||
|
offset += 4
|
||
|
if child is None:
|
||
|
pack_into('I', buffer, offset, 1)
|
||
|
offset += 4
|
||
|
return offset
|
||
|
(inverted, left, right) = child
|
||
|
pack_into('I', buffer, offset, 2 if not inverted else 3)
|
||
|
offset += 4
|
||
|
offset = encode_f(left, buffer, offset)
|
||
|
offset = encode_f(right, buffer, offset)
|
||
|
return offset
|
||
|
|
||
|
def generate_random_branch(p_mutation):
|
||
|
global N
|
||
|
|
||
|
p_add_indices = p_mutation * random.random()
|
||
|
p_add_children = p_mutation * random.random()
|
||
|
|
||
|
inverted = random.randint(0, 1)
|
||
|
indices = set()
|
||
|
children = []
|
||
|
|
||
|
# randomly add indices
|
||
|
while random.random() < p_add_indices and len(indices) < N:
|
||
|
available_indices = [i for i in range(0, N) if i not in indices]
|
||
|
if len(available_indices) == 1:
|
||
|
indices.add(available_indices[0])
|
||
|
continue
|
||
|
indices.add(available_indices[random.randint(0, len(available_indices) - 1)])
|
||
|
# randomly add children
|
||
|
while random.random() < p_add_children:
|
||
|
child_inverted = random.randint(0, 1)
|
||
|
left = generate_random_branch(p_add_children)
|
||
|
right = generate_random_branch(p_add_children)
|
||
|
children.append((child_inverted, left, right))
|
||
|
return (inverted, indices, children)
|
||
|
|
||
|
def mutate_f(f, p_mutation):
|
||
|
global N
|
||
|
(inverted, indices, children) = f
|
||
|
mutated_indices = set(indices)
|
||
|
mutated_children = children[:]
|
||
|
|
||
|
p_invert = p_mutation * random.random()
|
||
|
p_drop_indices = p_mutation * random.random()
|
||
|
p_add_indices = p_mutation * random.random()
|
||
|
p_drop_children = p_mutation * random.random()
|
||
|
p_mutate_child = p_mutation * random.random()
|
||
|
p_clone_child = p_mutation * random.random()
|
||
|
p_invert_child = p_mutation * random.random()
|
||
|
p_add_children = p_mutation * random.random()
|
||
|
|
||
|
# randomly invert
|
||
|
if random.random() < p_invert:
|
||
|
inverted ^= 1
|
||
|
# randomly drop indices
|
||
|
while random.random() < p_drop_indices and len(mutated_indices) > 0:
|
||
|
mutated_indices.pop()
|
||
|
# randomly add indices
|
||
|
while random.random() < p_add_indices and len(mutated_indices) < N:
|
||
|
available_indices = [i for i in range(0, N) if i not in mutated_indices]
|
||
|
if len(available_indices) == 1:
|
||
|
mutated_indices.add(available_indices[0])
|
||
|
continue
|
||
|
mutated_indices.add(available_indices[random.randint(0, len(available_indices) - 1)])
|
||
|
# randomly drop children
|
||
|
while random.random() < p_drop_children and len(mutated_children) > 0:
|
||
|
if len(mutated_children) == 1:
|
||
|
del mutated_children[0]
|
||
|
break
|
||
|
del mutated_children[random.randint(0, len(mutated_children) - 1)]
|
||
|
# randomly clone children
|
||
|
while random.random() < p_clone_child and len(mutated_children) > 0:
|
||
|
index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1)
|
||
|
(child_inverted, left, right) = mutated_children[index]
|
||
|
if random.random() < p_invert_child:
|
||
|
child_inverted ^= 1
|
||
|
clone = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation))
|
||
|
mutated_children.append(clone)
|
||
|
# randomly mutate children
|
||
|
while random.random() < p_mutate_child and len(mutated_children) > 0:
|
||
|
index = 0 if len(mutated_children) == 1 else random.randint(0, len(mutated_children) - 1)
|
||
|
(child_inverted, left, right) = mutated_children[index]
|
||
|
if random.random() < p_invert_child:
|
||
|
child_inverted ^= 1
|
||
|
mutated_children[index] = (child_inverted, mutate_f(left, p_mutation), mutate_f(right, p_mutation))
|
||
|
# randomly add children
|
||
|
while random.random() < p_add_children:
|
||
|
child_inverted = random.randint(0, 1)
|
||
|
left = generate_random_branch(p_mutation)
|
||
|
right = generate_random_branch(p_mutation)
|
||
|
mutated_children.append((child_inverted, left, right))
|
||
|
return (inverted, mutated_indices, mutated_children)
|
||
|
|
||
|
def decode_f(buffer, mutate = False, offset = 0, skip_invert = False):
|
||
|
global N
|
||
|
inverted = 0
|
||
|
if not skip_invert:
|
||
|
[inverted] = unpack_from('I', buffer, offset)
|
||
|
offset += 4
|
||
|
# random invert
|
||
|
if mutate and random.random() < 0.01:
|
||
|
inverted ^= 1
|
||
|
inverted &= 0b1
|
||
|
flips = set()
|
||
|
# random add flip
|
||
|
while mutate and random.random() < 0.5 and len(flips) < N:
|
||
|
available_indices = [i for i in range(0, N) if i not in flips]
|
||
|
if len(available_indices) == 1:
|
||
|
flips.add(available_indices[0])
|
||
|
continue
|
||
|
flips.add(available_indices[random.randint(0, len(available_indices) - 1)])
|
||
|
while offset < len(buffer):
|
||
|
# random create branch
|
||
|
if mutate and random.random() < 0.01:
|
||
|
gate_inverted = random.randint(0, 1)
|
||
|
left = generate_random_branch()
|
||
|
(offset, right) = decode_f(buffer, mutate, offset, True)
|
||
|
return (offset, (inverted, flips, (gate_inverted, left, right)))
|
||
|
[opcode] = unpack_from('I', buffer, offset)
|
||
|
offset += 4
|
||
|
opcode &= 0b11
|
||
|
if opcode == 0:
|
||
|
[index] = unpack_from('I', buffer, offset)
|
||
|
offset += 4
|
||
|
# random skip flip
|
||
|
if mutate and random.random() < 0.01:
|
||
|
continue
|
||
|
if index in flips:
|
||
|
flips.remove(index)
|
||
|
else:
|
||
|
flips.add(index)
|
||
|
elif opcode == 1:
|
||
|
return (offset, (inverted, flips, None))
|
||
|
else:
|
||
|
(offset, left) = decode_f(buffer, mutate, offset)
|
||
|
(offset, right) = decode_f(buffer, mutate, offset)
|
||
|
gate_inverted = 0 if opcode == 2 else 1
|
||
|
# random invert
|
||
|
if mutate and random.random() < 0.01:
|
||
|
gate_inverted ^= 1
|
||
|
# random skip branch
|
||
|
if mutate and random.random() < 0.01:
|
||
|
return (offset, (inverted, flips, None))
|
||
|
return (offset, (inverted, flips, (gate_inverted, left, right)))
|
||
|
return (offset, (inverted, [], None))
|
||
|
|
||
|
def generate_program(f):
|
||
|
statement = ""
|
||
|
(inverted, indices, children) = f
|
||
|
if inverted:
|
||
|
statement += "1^"
|
||
|
statement += "("
|
||
|
for i in indices:
|
||
|
statement += "(x[" + str(i) + ">>3]&(1<<(" + str(i) + "&0b111))!=0)^"
|
||
|
for child in children:
|
||
|
(gate_inverted, left, right) = child
|
||
|
if gate_inverted:
|
||
|
statement += "1^"
|
||
|
statement += "((" + generate_program(left) + ")&(" + generate_program(right) + "))^"
|
||
|
statement += "0)"
|
||
|
return statement
|
||
|
|
||
|
def compile_f(f):
|
||
|
program = 'def f(x):\n\treturn ' + generate_program(f)
|
||
|
scope = {}
|
||
|
exec(program, scope)
|
||
|
return scope['f']
|
||
|
|
||
|
def evaluate(model, x, value = 0):
|
||
|
(inverted, indices, children) = model
|
||
|
for i in indices:
|
||
|
if bit_at_index(x, i) != 0:
|
||
|
value ^= 1
|
||
|
for child in children:
|
||
|
(child_inverted, left, right) = child
|
||
|
left = evaluate(left, x)
|
||
|
right = evaluate(right, x)
|
||
|
if left & right != child_inverted:
|
||
|
value ^= 1
|
||
|
if inverted:
|
||
|
value ^= 1
|
||
|
return value
|
||
|
|
||
|
def encode(v):
|
||
|
byte_values = []
|
||
|
for i in range(0, math.ceil(N / 8)):
|
||
|
x = 0
|
||
|
for j in range(0, 8):
|
||
|
index = i * 8 + j
|
||
|
x <<= 1
|
||
|
x |= int(v[index])
|
||
|
byte_values.append(x)
|
||
|
return bytearray(x)
|
||
|
|
||
|
def sha(v):
|
||
|
x = encode(v)
|
||
|
m = hashlib.sha256()
|
||
|
m.update(x)
|
||
|
result = m.digest()
|
||
|
return result[0] & 0b1
|
||
|
|
||
|
def xor(x):
|
||
|
num_one_bits = 0
|
||
|
for n in x:
|
||
|
num_one_bits += count_one_bits(n)
|
||
|
return num_one_bits % 2
|
||
|
|
||
|
def random_sample(m, n):
|
||
|
inputs = np.zeros((m, n))
|
||
|
for i in range(0, m):
|
||
|
for j in range(0, n):
|
||
|
inputs[i][j] = random.randint(0, 1)
|
||
|
return inputs
|
||
|
|
||
|
def update_sample(sample, index):
|
||
|
global N
|
||
|
for j in range(0, N):
|
||
|
sample[index][j] = random.randint(0, 1)
|
||
|
|
||
|
def coherence(inputs, outputs):
|
||
|
coherences = []
|
||
|
for i in range(0, len(inputs)):
|
||
|
x_a = inputs[i]
|
||
|
y_a = outputs[i]
|
||
|
numerator = 0
|
||
|
denominator = 0
|
||
|
for j in range(0, len(inputs)):
|
||
|
if i == j:
|
||
|
continue
|
||
|
x_b = inputs[j]
|
||
|
y_b = outputs[j]
|
||
|
distance = hamming_distance(x_a, x_b)
|
||
|
weight = 1.0 / (2 ** distance)
|
||
|
denominator += weight
|
||
|
if y_a == y_b:
|
||
|
numerator += weight
|
||
|
coherence = numerator / denominator if denominator > 0 else 0
|
||
|
coherences.append(coherence)
|
||
|
return sum(coherences) / len(coherences)
|
||
|
|
||
|
def score(f, sample, distances):
|
||
|
return coherence([(x, f(x) ^ y) for (x, y) in sample], distances)
|
||
|
|
||
|
def compute_distances(inputs, distances, scratch):
|
||
|
for i in range(0, len(inputs)):
|
||
|
a = inputs[i]
|
||
|
for j in range(i, len(inputs)):
|
||
|
if i == j:
|
||
|
distances[i][j] = 0
|
||
|
continue
|
||
|
b = inputs[j]
|
||
|
distance = 2 ** -hamming_distance(a, b, scratch)
|
||
|
distances[i][j] = distance
|
||
|
distances[j][i] = distance
|
||
|
|
||
|
def update_distances(inputs, distances, i, scratch):
|
||
|
a = inputs[i]
|
||
|
for j in range(0, len(inputs)):
|
||
|
if i == j:
|
||
|
distances[i][j] = 0
|
||
|
continue
|
||
|
b = inputs[j]
|
||
|
distance = 2 ** -hamming_distance(a, b, scratch)
|
||
|
distances[i][j] = distance
|
||
|
distances[j][i] = distance
|
||
|
|
||
|
def evaluate_sample(model, sample, output):
|
||
|
stack = [model]
|
||
|
(_, _, _, root_scratch, _) = model
|
||
|
while len(stack) > 0:
|
||
|
layer = stack.pop()
|
||
|
(inverted, xors, child, scratch, touched) = layer
|
||
|
if child is None:
|
||
|
np.matmul(sample, xors, scratch)
|
||
|
np.mod(scratch, 2, scratch)
|
||
|
if inverted == 1:
|
||
|
np.logical_xor(1, scratch, scratch)
|
||
|
touched[0] = 1
|
||
|
else:
|
||
|
(child_inverted, left, right) = child
|
||
|
(_, _, _, left_scratch, left_touched) = left
|
||
|
(_, _, _, right_scratch, right_touched) = right
|
||
|
if left_touched[0] and right_touched[0]:
|
||
|
np.multiply(left_scratch, right_scratch, output)
|
||
|
np.matmul(sample, xors, scratch)
|
||
|
np.mod(scratch, 2, scratch)
|
||
|
if inverted:
|
||
|
np.logical_xor(scratch, 1, scratch)
|
||
|
if child_inverted:
|
||
|
np.logical_xor(output, 1, output)
|
||
|
np.logical_xor(scratch, output, scratch)
|
||
|
touched[0] = 1
|
||
|
else:
|
||
|
stack.insert(0, layer)
|
||
|
stack.insert(0, left)
|
||
|
stack.insert(0, right)
|
||
|
np.copyto(output, root_scratch)
|
||
|
reset_model(model)
|
||
|
|
||
|
def reset_model(model):
|
||
|
stack = [model]
|
||
|
while len(stack) > 0:
|
||
|
layer = stack.pop()
|
||
|
(_, _, child, _, touched) = layer
|
||
|
touched[0] = 0
|
||
|
if not child is None:
|
||
|
(_, left, right) = child
|
||
|
stack.append(left)
|
||
|
stack.append(right)
|
||
|
|
||
|
def clone_model(model, p_mutation):
|
||
|
global N
|
||
|
|
||
|
p_invert = p_mutation * random.random()
|
||
|
p_invert_child = p_mutation * random.random()
|
||
|
p_flip = p_mutation * random.random()
|
||
|
p_add_child = p_mutation * random.random()
|
||
|
# p_drop_child = p_mutation * random.random() * 0.5
|
||
|
p_drop_child = 0
|
||
|
|
||
|
(inverted, xors, child, scratch, touched) = model
|
||
|
if random.random() < p_invert:
|
||
|
inverted ^= 1
|
||
|
clone_xors = np.zeros((N,))
|
||
|
np.copyto(clone_xors, xors)
|
||
|
for i in range(0, N):
|
||
|
if random.random() < p_flip:
|
||
|
clone_xors[i] = int(clone_xors[i]) ^ 1
|
||
|
clone_scratch = np.zeros(np.shape(scratch))
|
||
|
clone_touched = np.zeros(np.shape(touched))
|
||
|
if child is None:
|
||
|
if random.random() < p_add_child:
|
||
|
sample_size = len(scratch)
|
||
|
child_inverted = random.randint(0, 1)
|
||
|
left = random_child(sample_size, p_mutation)
|
||
|
right = random_child(sample_size, p_mutation)
|
||
|
return (inverted, clone_xors, (child_inverted, left, right), clone_scratch, clone_touched)
|
||
|
return (inverted, clone_xors, None, clone_scratch, clone_touched)
|
||
|
if random.random() < p_drop_child:
|
||
|
return (inverted, clone_xors, None, clone_scratch, clone_touched)
|
||
|
(child_inverted, left, right) = child
|
||
|
if random.random() < p_invert_child:
|
||
|
inverted ^= 1
|
||
|
clone_left = clone_model(left, p_mutation)
|
||
|
clone_right = clone_model(right, p_mutation)
|
||
|
return (inverted, clone_xors, (child_inverted, clone_left, clone_right), clone_scratch, clone_touched)
|
||
|
|
||
|
def random_child(sample_size, p_mutation):
|
||
|
global N
|
||
|
inverted = random.randint(0, 1)
|
||
|
xors = np.zeros((N,))
|
||
|
scratch = np.zeros((sample_size,))
|
||
|
touched = np.zeros((1,))
|
||
|
|
||
|
p_flip = p_mutation * random.random()
|
||
|
p_child = p_mutation * random.random()
|
||
|
|
||
|
index = random.randint(0, N - 1)
|
||
|
xors[index] = 1
|
||
|
for i in range(0, N):
|
||
|
if random.random() < p_flip:
|
||
|
xors[i] = 1
|
||
|
# if random.random() < p_child:
|
||
|
# child_inverted = random.randint(0, 1)
|
||
|
# left = random_child(sample_size, p_mutation * random.random())
|
||
|
# right = random_child(sample_size, p_mutation * random.random())
|
||
|
# return (inverted, xors, (child_inverted, left, right), scratch, touched)
|
||
|
return (inverted, xors, None, scratch, touched)
|
||
|
|
||
|
def size(model):
|
||
|
(_, xors, child, _, _) = model
|
||
|
xor_size = np.sum(xors)
|
||
|
if not child is None:
|
||
|
(_, left, right) = child
|
||
|
return xor_size + size(left) * size(right)
|
||
|
return xor_size
|
||
|
|
||
|
def null_candidate(sample_size):
|
||
|
global N
|
||
|
return (0, np.zeros((N,)), None, np.zeros((sample_size,)), np.zeros((1,)))
|
||
|
|
||
|
def main():
|
||
|
global N
|
||
|
epochs = 10000
|
||
|
num_survivors = 100
|
||
|
num_offspring = 10
|
||
|
num_candidates = num_survivors + num_survivors * num_offspring
|
||
|
sample_size = 32
|
||
|
eval_size = 100
|
||
|
p_mutation = 0.5
|
||
|
g = sha
|
||
|
current_generation = [null_candidate(sample_size) for _ in range(0, num_candidates)]
|
||
|
|
||
|
distances = np.zeros((sample_size, sample_size))
|
||
|
output_equality = np.zeros((sample_size, sample_size))
|
||
|
inputs = random_sample(sample_size, N)
|
||
|
scratch = np.zeros(N,)
|
||
|
compute_distances(inputs, distances, scratch)
|
||
|
expected_outputs = np.zeros((sample_size,))
|
||
|
for i in range(0, sample_size):
|
||
|
expected_outputs[i] = g(inputs[i])
|
||
|
outputs = np.zeros((sample_size,))
|
||
|
output_xor = np.zeros((sample_size,))
|
||
|
ones = np.ones((sample_size,))
|
||
|
numerators = np.zeros((sample_size,))
|
||
|
denominators = np.zeros((sample_size,))
|
||
|
coherences = np.zeros((sample_size,))
|
||
|
np.matmul(ones, distances, denominators)
|
||
|
scores = np.zeros((num_candidates,))
|
||
|
max_score = 0
|
||
|
last_score = 0
|
||
|
streak = 0
|
||
|
|
||
|
for epoch in range(0, epochs):
|
||
|
for i in range(0, num_candidates):
|
||
|
candidate = current_generation[i]
|
||
|
evaluate_sample(candidate, inputs, outputs)
|
||
|
np.logical_xor(outputs, expected_outputs, output_xor)
|
||
|
for p in range(0, sample_size):
|
||
|
for q in range(0, sample_size):
|
||
|
m = int(output_xor[p])
|
||
|
n = int(output_xor[q])
|
||
|
output_equality[p][q] = 1 ^ m ^ n
|
||
|
np.multiply(output_equality, distances, output_equality)
|
||
|
np.matmul(ones, output_equality, numerators)
|
||
|
np.divide(numerators, denominators, coherences)
|
||
|
score = np.average(coherences)
|
||
|
scores[i] = score
|
||
|
|
||
|
top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:]
|
||
|
survivors = [current_generation[index] for index in top_n]
|
||
|
|
||
|
# f = lambda x: evaluate(current_generation[0], x)
|
||
|
# correct = 0
|
||
|
# for i in range(0, eval_size):
|
||
|
# x = random_input()
|
||
|
# if f(x) == g(x):
|
||
|
# correct += 1
|
||
|
|
||
|
top_score = scores[top_n[-1]]
|
||
|
print(epoch, top_score, size(survivors[-1]))
|
||
|
if top_score <= max_score:
|
||
|
p_mutation += 0.01
|
||
|
else:
|
||
|
p_mutation = 0.5
|
||
|
max_score = top_score
|
||
|
|
||
|
for i in range(0, num_survivors):
|
||
|
current_generation[i] = survivors[i]
|
||
|
|
||
|
for i in range(0, num_survivors):
|
||
|
candidate = survivors[i]
|
||
|
for j in range(0, num_offspring):
|
||
|
index = num_survivors + j * num_survivors + i
|
||
|
current_generation[index] = clone_model(candidate, random.random())
|
||
|
|
||
|
# while random.random() < 0.5:
|
||
|
if last_score == top_score:
|
||
|
# streak += 1
|
||
|
# else:
|
||
|
# streak = 0
|
||
|
# if streak >= 4:
|
||
|
# streak = 0
|
||
|
inputs = random_sample(sample_size, N)
|
||
|
compute_distances(inputs, distances, scratch)
|
||
|
np.matmul(ones, distances, denominators)
|
||
|
for i in range(0, sample_size):
|
||
|
expected_outputs[i] = g(inputs[i])
|
||
|
# expected_outputs = np.zeros((sample_size,))
|
||
|
# for i in range(0, sample_size):
|
||
|
# expected_outputs[i] = g(inputs[i])
|
||
|
# index = random.randint(0, sample_size - 1)
|
||
|
# update_sample(inputs, index)
|
||
|
# expected_outputs[index] = g(inputs[index])
|
||
|
# update_distances(inputs, distances, index, scratch)
|
||
|
# np.matmul(ones, distances, denominators)
|
||
|
last_score = top_score
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|