417 lines
15 KiB
Python
417 lines
15 KiB
Python
import hashlib
|
|
import math
|
|
import numpy as np
|
|
import random
|
|
|
|
N = 8
|
|
M = 2
|
|
|
|
class Candidate:
|
|
def __init__(self):
|
|
global N
|
|
self.bias = 0
|
|
self.offsets = np.zeros((N,)).astype(np.int32)
|
|
self.has_child = 0
|
|
self.left = None
|
|
self.right = None
|
|
|
|
def addOffset(self, x):
|
|
self.offsets[x] = 1
|
|
return self
|
|
|
|
def setChild(self, left, right):
|
|
self.has_child = 1
|
|
self.left = left
|
|
self.right = right
|
|
return self
|
|
|
|
class Probabilities:
|
|
def __init__(self):
|
|
global N, M
|
|
self.p_bias = np.zeros(2,)
|
|
self.p_bias.fill(0.5)
|
|
self.p_offsets = np.zeros((2,N))
|
|
self.p_offsets.fill(0.5)
|
|
self.p_has_child = 0
|
|
|
|
self.bias_coherences = np.zeros((2, M,))
|
|
self.bias_coherences.fill(0.5)
|
|
self.offset_coherences = np.zeros((2, M, N))
|
|
self.offset_coherences.fill(0.5)
|
|
self.has_child_coherences = np.zeros((2,))
|
|
self.has_child_coherences.fill(0.5)
|
|
|
|
self.uncertainty = np.zeros((2,))
|
|
self.totals = np.zeros((2,))
|
|
|
|
self.left = None
|
|
self.right = None
|
|
self.parent = None
|
|
self.depth = 1
|
|
|
|
def reset_uncertainty(self):
|
|
if self.totals[0] == 0 and self.totals[1] == 0:
|
|
return
|
|
self.uncertainty.fill(0)
|
|
self.totals.fill(0)
|
|
if not self.left is None:
|
|
self.left.reset_uncertainty()
|
|
if not self.right is None:
|
|
self.right.reset_uncertainty()
|
|
|
|
def min_p_has_child(self):
|
|
without_child = self.uncertainty[0] / self.totals[0] if self.totals[0] > 0 else 0
|
|
with_child = self.uncertainty[1] / self.totals[1] if self.totals[1] > 0 else 0
|
|
|
|
if without_child == 0 and with_child == 0:
|
|
return 0.5
|
|
return without_child / (without_child + with_child)
|
|
|
|
def confidence(self):
|
|
global N
|
|
total = (2 * self.p_bias[0] - 1) ** 2
|
|
for i in range(0, N):
|
|
total += (2 * self.p_offsets[0][i] - 1) ** 2
|
|
return total / (N + 1)
|
|
|
|
def clamp(x, min_value = 0.01, max_value = 1):
|
|
return min(max(x, min_value), max_value)
|
|
|
|
def encode(v):
|
|
global N
|
|
byte_values = []
|
|
for i in range(0, math.ceil(N / 8)):
|
|
x = 0
|
|
for j in range(0, 8):
|
|
index = i * 8 + j
|
|
x <<= 1
|
|
x |= int(v[index])
|
|
byte_values.append(x)
|
|
return bytearray(byte_values)
|
|
|
|
def sha(v):
|
|
global M
|
|
x = encode(v)
|
|
m = hashlib.sha256()
|
|
m.update(x)
|
|
result = m.digest()
|
|
return result[0] % M
|
|
|
|
def xor(x):
|
|
num_one_bits = 0
|
|
for i in range(0, len(x)):
|
|
if i == 0:
|
|
continue
|
|
num_one_bits += x[i]
|
|
return num_one_bits % 2
|
|
|
|
test_candidate = Candidate().addOffset(0).addOffset(1).setChild(
|
|
Candidate().addOffset(2), Candidate().addOffset(3).setChild(
|
|
Candidate().addOffset(4), Candidate().addOffset(5)
|
|
))
|
|
|
|
def eval_test_candidate(x):
|
|
global test_candidate
|
|
return evaluate_candidate(test_candidate, x)
|
|
|
|
def hamming_distance(a, b, scratch):
|
|
np.logical_xor(a, b, scratch)
|
|
return sum(scratch)
|
|
|
|
def coherence(inputs, outputs, scratch):
|
|
coherences = []
|
|
for i in range(0, len(inputs)):
|
|
x_a = inputs[i]
|
|
y_a = outputs[i]
|
|
numerator = 0
|
|
denominator = 0
|
|
for j in range(0, len(inputs)):
|
|
if i == j:
|
|
continue
|
|
x_b = inputs[j]
|
|
y_b = outputs[j]
|
|
distance = hamming_distance(x_a, x_b, scratch)
|
|
weight = 1.0 / (2 ** distance)
|
|
denominator += weight
|
|
if y_a == 0 and y_b == 0:
|
|
numerator += weight
|
|
coherence = numerator / denominator if denominator > 0 else 0
|
|
coherences.append(coherence)
|
|
return sum(coherences) / len(coherences)
|
|
|
|
def random_sample(m, n):
|
|
inputs = np.zeros((m, n))
|
|
for i in range(0, m):
|
|
for j in range(0, n):
|
|
inputs[i][j] = random.randint(0, 1)
|
|
return inputs
|
|
|
|
def evaluate_candidate(candidate, x):
|
|
global N, M
|
|
value = candidate.bias
|
|
for i in range(0, N):
|
|
value += x[i] * candidate.offsets[i]
|
|
value %= M
|
|
if candidate.has_child == 0:
|
|
return value
|
|
left = evaluate_candidate(candidate.left, x)
|
|
right = evaluate_candidate(candidate.right, x)
|
|
value += left * right
|
|
value %= M
|
|
return value
|
|
|
|
def evaluate(probabilities, candidate, x, z, update_uncertainty = True):
|
|
global N, M
|
|
value = candidate.bias
|
|
for i in range(0, N):
|
|
value += x[i] * candidate.offsets[i]
|
|
value %= M
|
|
if candidate.has_child == 0:
|
|
if update_uncertainty:
|
|
if value != z:
|
|
probabilities.uncertainty[0] += 1
|
|
probabilities.totals[0] += 1
|
|
return value
|
|
e = (value - z) % M
|
|
left = evaluate(probabilities.left, candidate.left, x, e, False)
|
|
right = evaluate(probabilities.right, candidate.right, x, e, False)
|
|
if update_uncertainty:
|
|
if e == 0:
|
|
if left == 1 and right == 1:
|
|
evaluate(probabilities.left, candidate.left, x, e)
|
|
evaluate(probabilities.right, candidate.right, x, e)
|
|
if left == 0:
|
|
evaluate(probabilities.left, candidate.left, x, e)
|
|
if right == 0:
|
|
evaluate(probabilities.right, candidate.right, x, e)
|
|
elif e == 1:
|
|
if left == 1 and right == 1:
|
|
evaluate(probabilities.left, candidate.left, x, e)
|
|
evaluate(probabilities.right, candidate.right, x, e)
|
|
if left == 0:
|
|
evaluate(probabilities.left, candidate.left, x, e)
|
|
if right == 0:
|
|
evaluate(probabilities.right, candidate.right, x, e)
|
|
value += left * right
|
|
value %= M
|
|
if update_uncertainty:
|
|
if value != z:
|
|
probabilities.uncertainty[1] += 1
|
|
probabilities.totals[1] += 1
|
|
return value
|
|
|
|
def update_probabilities(probabilities, candidates, scores, depth = 1):
|
|
global N, M
|
|
num_candidates = len(candidates)
|
|
min_p_has_child = probabilities.min_p_has_child()
|
|
|
|
for z in range(0, 2):
|
|
for i in range(0, M):
|
|
bias_i_max = 0
|
|
for k in range(0, num_candidates):
|
|
candidate = candidates[k]
|
|
if candidate is None:
|
|
continue
|
|
if candidate.bias != i:
|
|
continue
|
|
if candidate.has_child != z:
|
|
continue
|
|
bias_i_max = max(bias_i_max, scores[k])
|
|
if bias_i_max == 0:
|
|
continue
|
|
probabilities.bias_coherences[z][i] = 0.9 * probabilities.bias_coherences[z][i] + 0.1 * bias_i_max
|
|
|
|
for z in range(0, 2):
|
|
for i in range(0, M):
|
|
for j in range(0, N):
|
|
offset_ij_max = 0
|
|
for k in range(0, num_candidates):
|
|
candidate = candidates[k]
|
|
if candidate is None:
|
|
continue
|
|
if candidate.offsets[j] != i:
|
|
continue
|
|
if candidate.has_child != z:
|
|
continue
|
|
offset_ij_max = max(offset_ij_max, scores[k])
|
|
if offset_ij_max == 0:
|
|
continue
|
|
probabilities.offset_coherences[z][i][j] = 0.9 * probabilities.offset_coherences[z][i][j] + 0.1 * offset_ij_max
|
|
|
|
for i in range(0, 2):
|
|
has_child_i_max = 0
|
|
for k in range(0, num_candidates):
|
|
candidate = candidates[k]
|
|
if candidate is None:
|
|
continue
|
|
if candidate.has_child != i:
|
|
continue
|
|
has_child_i_max = max(has_child_i_max, scores[k])
|
|
if has_child_i_max == 0:
|
|
continue
|
|
probabilities.has_child_coherences[i] = 0.9 * probabilities.has_child_coherences[i] + 0.1 * has_child_i_max
|
|
|
|
|
|
for z in range(0, 2):
|
|
# direction = 1 if z == 0 and probabilities.has_child_coherences[0] > probabilities.has_child_coherences[1] or z == 1 and probabilities.has_child_coherences[1] > probabilities.has_child_coherences[0] else -1
|
|
direction = 1
|
|
p_bias_next = clamp(probabilities.p_bias[z] + direction * (probabilities.bias_coherences[z][1] - probabilities.bias_coherences[z][0]), 0, 1)
|
|
# if z == 0 and probabilities.has_child_coherences[0] < probabilities.has_child_coherences[1] or z == 1 and probabilities.has_child_coherences[0] > probabilities.has_child_coherences[1]:
|
|
# p_bias_next = 0.5
|
|
probabilities.p_bias[z] = 0.9 * probabilities.p_bias[z] + 0.1 * p_bias_next
|
|
for j in range(0, N):
|
|
p_offset_next = clamp(probabilities.p_offsets[z][j] + direction * (probabilities.offset_coherences[z][1][j] - probabilities.offset_coherences[z][0][j]), 0, 1)
|
|
# if z == 0 and probabilities.has_child_coherences[0] < probabilities.has_child_coherences[1] or z == 1 and probabilities.has_child_coherences[0] > probabilities.has_child_coherences[1]:
|
|
# p_offset_next = 0.5
|
|
probabilities.p_offsets[z][j] = 0.9 * probabilities.p_offsets[z][j] + 0.1 * p_offset_next
|
|
|
|
# direction = 1 if probabilities.parent is None or probabilities.parent.has_child_coherences[1] > probabilities.parent.has_child_coherences[0] else -1
|
|
direction = 1
|
|
# p_has_child_next = clamp(probabilities.p_has_child + direction * (probabilities.has_child_coherences[1] - probabilities.has_child_coherences[0]), probabilities.min_p_has_child(), 1)
|
|
# probabilities.p_has_child = 0.9 * probabilities.p_has_child + 0.1 *
|
|
if probabilities.confidence() > 0.9 and probabilities.p_has_child == 0:
|
|
probabilities.p_bias[0] = round(probabilities.p_bias[0])
|
|
for i in range(0, N):
|
|
probabilities.p_offsets[0][i] = round(probabilities.p_offsets[0][i])
|
|
probabilities.p_has_child = 1
|
|
|
|
# if probabilities.has_child_coherences[0] > probabilities.has_child_coherences[1]:
|
|
# return
|
|
|
|
p_left = probabilities.left
|
|
p_right = probabilities.right
|
|
if not p_left is None:
|
|
left = [candidate.left if not candidate is None and candidate.has_child else None for candidate in candidates]
|
|
if any(x is not None for x in left):
|
|
update_probabilities(p_left, left, scores, depth + 1)
|
|
if not p_right is None:
|
|
right = [candidate.right if not candidate is None and candidate.has_child else None for candidate in candidates]
|
|
if any(x is not None for x in right):
|
|
update_probabilities(p_right, right, scores, depth + 1)
|
|
|
|
|
|
def create_candidate(probabilities, candidate):
|
|
global N
|
|
new_children = 0
|
|
z = 1 if random.random() < probabilities.p_has_child and probabilities.depth <= 4 else 0
|
|
candidate.bias = 1 if random.random() < probabilities.p_bias[0] else 0
|
|
for i in range(0, N):
|
|
candidate.offsets[i] = 1 if random.random() < probabilities.p_offsets[0][i] else 0
|
|
if not z:
|
|
candidate.has_child = 0
|
|
return new_children
|
|
if probabilities.p_has_child < 1:
|
|
new_children += 1
|
|
candidate.has_child = 1
|
|
if candidate.left is None:
|
|
candidate.left = Candidate()
|
|
if candidate.right is None:
|
|
candidate.right = Candidate()
|
|
depth = probabilities.depth + 1
|
|
if probabilities.left is None:
|
|
probabilities.left = Probabilities()
|
|
probabilities.left.parent = probabilities
|
|
probabilities.left.depth = depth
|
|
# probabilities.left.p_has_child = 2 ** -depth
|
|
if probabilities.right is None:
|
|
probabilities.right = Probabilities()
|
|
probabilities.right.parent = probabilities
|
|
probabilities.right.depth = depth
|
|
# probabilities.right.p_has_child = 2 ** -depth
|
|
new_children += create_candidate(probabilities.left, candidate.left)
|
|
new_children += create_candidate(probabilities.right, candidate.right)
|
|
return new_children
|
|
|
|
def copy_candidate(src, dest):
|
|
global N
|
|
dest.bias = src.bias
|
|
for i in range(0, N):
|
|
dest.offsets[i] = src.offsets[i]
|
|
has_child = src.has_child
|
|
dest.has_child = has_child
|
|
if not has_child:
|
|
return
|
|
if dest.left is None:
|
|
dest.left = Candidate()
|
|
if dest.right is None:
|
|
dest.right = Candidate()
|
|
copy_candidate(src.left, dest.left)
|
|
copy_candidate(src.right, dest.right)
|
|
|
|
def p(x):
|
|
return math.ceil(x * 100) / 100
|
|
|
|
def p_a(x):
|
|
return [p(z) for z in x]
|
|
|
|
def print_probabilities(probabilities, depth=0):
|
|
global M
|
|
if depth == 0:
|
|
print('=====================')
|
|
left = probabilities.left
|
|
right = probabilities.right
|
|
if left is None:
|
|
print('None')
|
|
else:
|
|
print_probabilities(left, depth + 1)
|
|
if right is None:
|
|
print('None')
|
|
else:
|
|
print_probabilities(right, depth + 1)
|
|
for z in range(0, 2):
|
|
# for i in range(0, M):
|
|
# print(z, i, p(probabilities.bias_coherences[z][i]), p_a(probabilities.offset_coherences[z][i]), p(probabilities.has_child_coherences[i]))
|
|
print(depth, z, p(probabilities.p_bias[z]), p_a(probabilities.p_offsets[z]), p(probabilities.p_has_child), p(probabilities.confidence()))
|
|
if depth == 0:
|
|
print('=====================')
|
|
|
|
def main():
|
|
global N, M
|
|
sample_size = 64
|
|
num_candidates = 100
|
|
num_survivors = 10
|
|
epochs = 1000
|
|
output_xor = np.zeros(sample_size,)
|
|
scratch = np.zeros(N,)
|
|
g = eval_test_candidate
|
|
expected_outputs = np.zeros((sample_size,))
|
|
inputs = random_sample(sample_size, N)
|
|
for i in range(0, sample_size):
|
|
expected_outputs[i] = g(inputs[i])
|
|
outputs = np.zeros((sample_size,))
|
|
probabilities = Probabilities()
|
|
candidates = [Candidate() for _ in range(0, num_candidates + num_survivors)]
|
|
scores = np.zeros((num_candidates + num_survivors,))
|
|
|
|
while True:
|
|
max_new_children = 0
|
|
min_new_children = 1e6
|
|
probabilities.reset_uncertainty()
|
|
for i in range(0, len(candidates)):
|
|
candidate = candidates[i]
|
|
if i < num_candidates:
|
|
create_candidate(probabilities, candidate)
|
|
for j in range(0, sample_size):
|
|
outputs[j] = evaluate(probabilities, candidate, inputs[j], expected_outputs[j])
|
|
np.subtract(outputs, expected_outputs, output_xor)
|
|
np.mod(output_xor, M, output_xor)
|
|
scores[i] = coherence(inputs, output_xor, scratch)
|
|
update_probabilities(probabilities, candidates, scores)
|
|
print_probabilities(probabilities)
|
|
print(np.max(scores))
|
|
|
|
top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:]
|
|
for i in range(0, num_survivors):
|
|
src_index = top_n[i]
|
|
dest_index = num_candidates + i
|
|
if src_index == dest_index:
|
|
continue
|
|
copy_candidate(candidates[src_index], candidates[dest_index])
|
|
|
|
inputs = random_sample(sample_size, N)
|
|
for i in range(0, sample_size):
|
|
expected_outputs[i] = g(inputs[i])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |