535 lines
18 KiB
Python
535 lines
18 KiB
Python
from enum import unique
|
|
import hashlib
|
|
import math
|
|
import numpy as np
|
|
import random
|
|
import time
|
|
|
|
N = 8
|
|
N_ACTUAL = 2 * ((N - 1) + 8)
|
|
M = 2
|
|
|
|
def vec_to_int(x):
|
|
z = 0
|
|
for i in range(0, len(x)):
|
|
z <<= 1
|
|
z |= x[i]
|
|
return z
|
|
|
|
def timeit(f):
|
|
def timed(*args, **kw):
|
|
ts = time.time()
|
|
result = f(*args, **kw)
|
|
te = time.time()
|
|
|
|
print('func:%r took: %2.4f sec' % (f.__name__, te-ts))
|
|
return result
|
|
return timed
|
|
|
|
class Candidate:
|
|
def __init__(self, layer):
|
|
global N_ACTUAL
|
|
self.layer = layer
|
|
self.offsets = np.zeros((N_ACTUAL)).astype(np.int32)
|
|
|
|
class Probabilities:
|
|
def __init__(self, layer):
|
|
global N_ACTUAL
|
|
self.layer = layer
|
|
self.p_offsets = np.zeros((N_ACTUAL))
|
|
self.p_offsets.fill(0.5)
|
|
self.p_offsets_next = np.zeros((N_ACTUAL))
|
|
self.offset_coherences = np.zeros((N_ACTUAL))
|
|
self.offset_coherences.fill(-1)
|
|
self.knowns = set()
|
|
|
|
def snap(self):
|
|
reset = False
|
|
for j in range(0, len(self.p_offsets)):
|
|
if self.p_offsets[j] > 0.6 and self.p_offsets[j] < 0.95:
|
|
self.p_offsets[j] = 1.0
|
|
self.knowns.add(j)
|
|
flip = j ^ 0b1
|
|
self.p_offsets[flip] = 0.0
|
|
reset = True
|
|
break
|
|
elif self.p_offsets[j] < 0.05:
|
|
self.p_offsets[j] = 0.0
|
|
if reset:
|
|
for j in range(0, len(self.p_offsets)):
|
|
flip = j ^ 0b1
|
|
if self.p_offsets[j] < 0.95 and self.p_offsets[flip] < 0.95:
|
|
self.p_offsets[j] = 0.5
|
|
|
|
def eliminate_random_known(self):
|
|
if len(self.knowns) == 0:
|
|
return False
|
|
index = random.sample(self.knowns, 1)[0]
|
|
self.knowns.remove(index)
|
|
return True
|
|
|
|
def reset(self):
|
|
self.p_offsets.fill(0.5)
|
|
for index in self.knowns:
|
|
flip = index ^ 0b1
|
|
self.p_offsets[index] = 1.0
|
|
self.p_offsets[flip] = 0.0
|
|
|
|
def all_zeros(self):
|
|
for j in range(0, len(self.p_offsets)):
|
|
if self.p_offsets[j] > 0.05 and self.p_offsets[j] < 0.95:
|
|
return False
|
|
return True
|
|
|
|
def has_converged(self):
|
|
if self.all_zeros():
|
|
return True
|
|
|
|
top_n = sorted(range(len(self.p_offsets)), key=lambda i: self.p_offsets[i])[-self.layer:]
|
|
for i in top_n:
|
|
if self.p_offsets[i] < 0.95:
|
|
return False
|
|
|
|
return True
|
|
|
|
def flatten(self):
|
|
candidate = Candidate(self.layer)
|
|
top_n = sorted(range(len(self.p_offsets)), key=lambda i: self.p_offsets[i])[-self.layer:]
|
|
for i in top_n:
|
|
if self.p_offsets[i] < 0.95:
|
|
return None
|
|
candidate.offsets[i] = 1
|
|
|
|
return candidate
|
|
|
|
def clamp(x, min_value = 0.01, max_value = 1):
|
|
return min(max(x, min_value), max_value)
|
|
|
|
def encode(v):
|
|
byte_values = []
|
|
for i in range(0, math.ceil(len(v) / 8)):
|
|
x = 0
|
|
for j in range(0, 8):
|
|
index = i * 8 + j
|
|
if index >= len(v):
|
|
continue
|
|
x <<= 1
|
|
x |= int(v[index])
|
|
byte_values.append(x)
|
|
return bytearray(byte_values)
|
|
|
|
# 00100111 x4
|
|
# 00000110 x1
|
|
def sha(v):
|
|
global M
|
|
x = encode(v)
|
|
m = hashlib.sha256()
|
|
m.update(x)
|
|
result = m.digest()
|
|
return result[0] % M
|
|
|
|
def sha_byte(v):
|
|
x = encode(v)
|
|
m = hashlib.sha256()
|
|
m.update(x)
|
|
result = m.digest()
|
|
return result
|
|
|
|
def xor(x):
|
|
num_one_bits = 0
|
|
for i in range(0, len(x)):
|
|
if i == 0:
|
|
continue
|
|
num_one_bits += x[i]
|
|
return num_one_bits % 2
|
|
|
|
|
|
# 0 ^ 1 ^ (2 ^ (4 * (5 ^ 0 * 7))) * (3 ^ 6 * 7)
|
|
# 0 ^ 1 ^ 2 * 3 ^ 2 * 6 * 7 ^ 3 * 4 * (5 ^ 0 * 7)) ^ 4 * 6 * 7 * (5 ^ 0 * 7)
|
|
# 0 ^ 1 ^ 2 * 3 ^ 2 * 6 * 7 ^ 3 * 4 * 5 ^ 0 * 3 * 4 * 7 ^ 4 * 5 * 6 * 7 ^ 0 * 4 * 6 * 7
|
|
|
|
# 0 ^ 1 ^ 2*3 ^ 2*6*7 ^ 3*4*5 ^ 0*3*4*7 ^ 4*5*6*7 ^ 0*4*6*7
|
|
# What about strictly SOP?
|
|
# That is, 1-Hot of increasing complexity?
|
|
# How would that work?
|
|
# Candidate generation could apply some kind of softmax to filter down to one
|
|
#
|
|
def test_fn(x):
|
|
# 0 1
|
|
# 2 | 3
|
|
# 4 | 5 | 6 | 7
|
|
# | | 0 | 7 | | | |
|
|
return x[0] ^ x[1] ^ ((x[2] ^ (x[4] * (x[5] ^ (x[0] * x[7])))) * (x[3] ^ (x[6] * x[7])))
|
|
|
|
def candidate_fn(x):
|
|
return x[0] ^ x[1] ^ (~(x[2] ^ x[3]) * x[2])
|
|
|
|
def true_fn(x):
|
|
return x[0] ^ x[1] ^ (x[3] * x[2])
|
|
|
|
def hamming_distance(a, b, scratch):
|
|
np.logical_xor(a, b, scratch)
|
|
return sum(scratch)
|
|
|
|
def coherence(outputs, distances):
|
|
coherences = []
|
|
for i in range(0, len(outputs)):
|
|
y_a = outputs[i]
|
|
numerator = 0
|
|
denominator = 0
|
|
for j in range(0, len(outputs)):
|
|
if i == j:
|
|
continue
|
|
y_b = outputs[j]
|
|
weight = distances[i][j]
|
|
denominator += weight
|
|
if y_a == 0 and y_b == 0 or y_a == 1 and y_b == 1:
|
|
numerator += weight
|
|
coherence = numerator / denominator if denominator > 0 else 0
|
|
coherences.append(coherence)
|
|
return sum(coherences) / len(coherences)
|
|
|
|
def random_sample(m, inputs, augmented_inputs, outputs):
|
|
global N, N_ACTUAL
|
|
for i in range(0, m):
|
|
for j in range(0, N):
|
|
val = random.randint(0, 1)
|
|
inputs[i][j] = val
|
|
if j > 0:
|
|
augmented_inputs[i][(j - 1) * 2] = val
|
|
augmented_inputs[i][(j - 1) * 2 + 1] = 1 - val
|
|
# augmented_inputs[i][j * 2] = val
|
|
# augmented_inputs[i][j * 2 + 1] = 1 - val
|
|
output = sha_byte(inputs[i])
|
|
outputs[i] = inputs[i][0]
|
|
for k in range(0, 1):
|
|
output_byte = output[k]
|
|
for j in range(0, 8):
|
|
val = (output_byte >> j) & 0b1;
|
|
inputs[i][k * 8 + j] = val
|
|
augmented_inputs[i][(N - 1 + k * 8 + j) * 2] = val
|
|
augmented_inputs[i][(N - 1 + k * 8 + j) * 2 + 1] = 1 - val
|
|
# outputs[i] = g(inputs[i])
|
|
return (inputs, augmented_inputs, outputs)
|
|
|
|
def populate_distances(inputs, distances, scratch):
|
|
for i in range(0, len(inputs)):
|
|
x_a = inputs[i]
|
|
for j in range(0, len(inputs)):
|
|
if i == j:
|
|
continue
|
|
x_b = inputs[j]
|
|
distance = hamming_distance(x_a, x_b, scratch)
|
|
distances[i][j] = 1.0 / (2 ** distance)
|
|
|
|
def evaluate(layers, candidate, x, compute_scratch):
|
|
z = evaluate_layers(layers, x, compute_scratch)
|
|
z ^= evaluate_candidate(candidate, x, compute_scratch)
|
|
return z
|
|
|
|
def evaluate_layers(layers, x, compute_scratch):
|
|
z = 0
|
|
for layer in layers:
|
|
z ^= evaluate_candidate(layer, x, compute_scratch)
|
|
return z
|
|
|
|
def evaluate_candidate(candidate, x, compute_scratch):
|
|
compute_scratch.fill(0)
|
|
compute_scratch[0:len(candidate.offsets)] = candidate.offsets
|
|
np.multiply(compute_scratch, x, compute_scratch)
|
|
return 1 if np.sum(compute_scratch) - np.sum(candidate.offsets) == 0 else 0
|
|
|
|
def layer_str(layer):
|
|
parts = []
|
|
for i in range(0, len(layer.offsets)):
|
|
if layer.offsets[i] == 1:
|
|
parts.append('x[' + str(i) + ']')
|
|
return '*'.join(parts)
|
|
|
|
def cache_layers(layers):
|
|
expr = 'def f(x):\n\tresult=0\n'
|
|
for i in range(0, len(layers)):
|
|
layer = layers[i]
|
|
expr += '\tresult^=' + layer_str(layer) + '\n'
|
|
expr += '\treturn result\n'
|
|
scope = {}
|
|
exec(expr, scope)
|
|
return scope['f']
|
|
|
|
@timeit
|
|
def compute_scores(probabilities, candidates, num_candidates, layers, scores, distances, inputs, outputs, output_xor, expected_outputs, sample_size, int_scratch, cached_f):
|
|
global M
|
|
|
|
for i in range(0, sample_size):
|
|
outputs[0][i] = cached_f(inputs[i])
|
|
# outputs[0][i] = evaluate_layers(layers, inputs[i], int_scratch)
|
|
# check = cached_f(inputs[i])
|
|
# if check != outputs[0][i]:
|
|
# raise Exception('Mistake')
|
|
for j in range(1, num_candidates):
|
|
np.copyto(outputs[j], outputs[0])
|
|
np.subtract(outputs[0], expected_outputs, output_xor)
|
|
np.mod(output_xor, M, output_xor)
|
|
base_score = coherence(output_xor, distances)
|
|
|
|
scores.fill(0)
|
|
unique_candidates = {}
|
|
for j in range(0, num_candidates):
|
|
create_candidate(probabilities, candidates[j])
|
|
unique_candidates[candidate_str(candidates[j])] = j
|
|
|
|
for i in range(0, sample_size):
|
|
for _, j in unique_candidates.items():
|
|
candidate = candidates[j]
|
|
outputs[j][i] ^= evaluate_candidate(candidate, inputs[i], int_scratch)
|
|
for _, j in unique_candidates.items():
|
|
candidate = candidates[j]
|
|
np.subtract(outputs[j], expected_outputs, output_xor)
|
|
np.mod(output_xor, M, output_xor)
|
|
score = coherence(output_xor, distances)
|
|
scores[j] = score
|
|
# for j in range(0, num_candidates):
|
|
# candidate = candidates[j]
|
|
# create_candidate(probabilities, candidate)
|
|
|
|
# for i in range(0, sample_size):
|
|
# for j in range(0, num_candidates):
|
|
# candidate = candidates[j]
|
|
# outputs[j][i] ^= evaluate_candidate(candidate, inputs[i], int_scratch)
|
|
|
|
# for j in range(0, num_candidates):
|
|
# candidate = candidates[j]
|
|
# np.subtract(outputs[j], expected_outputs, output_xor)
|
|
# np.mod(output_xor, M, output_xor)
|
|
# score = coherence(output_xor, distances)
|
|
# scores[j] = score
|
|
|
|
return base_score
|
|
|
|
|
|
def compute_uplift(candidate, layers, distances, inputs, outputs, output_xor, expected_outputs, sample_size, int_scratch):
|
|
global M
|
|
|
|
for i in range(0, sample_size):
|
|
outputs[0][i] = evaluate_layers(layers, inputs[i], int_scratch)
|
|
np.subtract(outputs[0], expected_outputs, output_xor)
|
|
np.mod(output_xor, M, output_xor)
|
|
base_score = coherence(output_xor, distances)
|
|
|
|
for i in range(0, sample_size):
|
|
outputs[0][i] ^= evaluate_candidate(candidate, inputs[i], int_scratch)
|
|
|
|
np.subtract(outputs[0], expected_outputs, output_xor)
|
|
np.mod(output_xor, M, output_xor)
|
|
score = coherence(output_xor, distances)
|
|
return (base_score, score)
|
|
|
|
@timeit
|
|
def update_probabilities(probabilities, candidates, inputs, base_score, scores, scale):
|
|
num_candidates = len(candidates)
|
|
|
|
probabilities.offset_coherences.fill(-1)
|
|
for p in range(0, num_candidates):
|
|
score = scores[p]
|
|
if score == 0:
|
|
continue
|
|
candidate = candidates[p]
|
|
|
|
for j in range(0, len(candidate.offsets)):
|
|
if candidate.offsets[j] == 0:
|
|
continue
|
|
probabilities.offset_coherences[j] = max(score, probabilities.offset_coherences[j])
|
|
|
|
inertia = 0
|
|
for j in range(0, len(probabilities.p_offsets_next)):
|
|
p = probabilities.offset_coherences[j]
|
|
delta = p - base_score if p >= 0 else 0
|
|
probabilities.p_offsets_next[j] = clamp(probabilities.p_offsets[j] + delta, 0, 1)
|
|
inertia += abs(probabilities.p_offsets_next[j] - probabilities.p_offsets[j])
|
|
|
|
for j in range(0, len(probabilities.p_offsets_next)):
|
|
p_offset_next = 0.9 * probabilities.p_offsets[j] + 0.1 * probabilities.p_offsets_next[j]
|
|
# if p_offset_next <= 0.05:
|
|
# p_offset_next = 0.0
|
|
# elif p_offset_next >= 0.95:
|
|
# p_offset_next = 1.0
|
|
probabilities.p_offsets[j] = p_offset_next
|
|
# total = np.sum(probabilities.p_offsets[j])
|
|
# probabilities.p_offsets[j] *= 1.0 / total
|
|
|
|
probabilities.snap()
|
|
|
|
return inertia
|
|
|
|
def create_candidate(probabilities, candidate):
|
|
candidate.offsets.fill(0)
|
|
scores = np.empty_like(candidate.offsets).astype(np.float32)
|
|
for j in range(0, len(probabilities.p_offsets)):
|
|
if probabilities.p_offsets[j] == 1.0:
|
|
scores[j] = 1000
|
|
elif probabilities.p_offsets[j] == 0.0:
|
|
scores[j] = -1000
|
|
else:
|
|
scores[j] = random.random() + probabilities.p_offsets[j]
|
|
top = sorted(range(len(scores)), key=lambda i: scores[i], reverse = True)
|
|
picked = set()
|
|
for i in top:
|
|
flip = i ^ 0b1
|
|
if flip in picked:
|
|
continue
|
|
candidate.offsets[i] = 1
|
|
picked.add(i)
|
|
if len(picked) == candidate.layer:
|
|
return
|
|
|
|
def copy_candidate(src, dest):
|
|
for j in range(0, len(src.offsets)):
|
|
dest.offsets[j] = src.offsets[j]
|
|
|
|
def p(x):
|
|
return math.ceil(x * 100) / 100
|
|
|
|
def p_a(x):
|
|
return [p(z) for z in x]
|
|
|
|
def print_probabilities(probabilities):
|
|
print('=====================')
|
|
print(p_a(probabilities.p_offsets))
|
|
print('=====================')
|
|
|
|
def candidate_str(candidate):
|
|
build_str = ''
|
|
for j in range(0, len(candidate.offsets)):
|
|
build_str += str(candidate.offsets[j])
|
|
return build_str
|
|
|
|
def main():
|
|
global N, N_ACTUAL, M
|
|
sample_size = 64
|
|
num_candidates = 100
|
|
num_survivors = 1
|
|
uplift_sample_size = 128
|
|
output_xor = np.zeros(sample_size,)
|
|
scratch = np.zeros((N,))
|
|
int_scratch = np.zeros((N,)).astype(np.int32)
|
|
g = sha
|
|
layers = []
|
|
unique_layers = set()
|
|
augment_layers = []
|
|
layer = 1
|
|
inputs = np.zeros((sample_size, N)).astype(np.int32)
|
|
augmented_inputs = np.zeros((sample_size, N_ACTUAL)).astype(np.int32)
|
|
expected_outputs = np.zeros((sample_size,)).astype(np.int32)
|
|
random_sample(sample_size, inputs, augmented_inputs, expected_outputs)
|
|
distances = np.zeros((sample_size, sample_size))
|
|
populate_distances(inputs, distances, scratch)
|
|
outputs = np.zeros((num_candidates + num_survivors, sample_size,)).astype(np.int32)
|
|
scores = np.zeros((num_candidates + num_survivors,))
|
|
cached_f = cache_layers(layers)
|
|
probabilities = Probabilities(1)
|
|
|
|
np.subtract(outputs[0], expected_outputs, output_xor)
|
|
np.mod(output_xor, M, output_xor)
|
|
score = coherence(output_xor, distances)
|
|
|
|
with open('model.txt', 'w') as f:
|
|
while score < 1:
|
|
probabilities.layer = layer
|
|
candidates = [Candidate(layer) for _ in range(0, num_candidates + num_survivors)]
|
|
augmented_int_scratch = np.zeros((N_ACTUAL,)).astype(np.int32)
|
|
random_sample(sample_size, inputs, augmented_inputs, expected_outputs)
|
|
populate_distances(inputs, distances, scratch)
|
|
|
|
inertia = 1
|
|
epoch = 1
|
|
while inertia > 0.001 and epoch < 2000 and not probabilities.has_converged():
|
|
base_score = compute_scores(probabilities, candidates, num_candidates, layers, scores, distances, augmented_inputs, outputs, output_xor, expected_outputs, sample_size, augmented_int_scratch, cached_f)
|
|
round_inertia = update_probabilities(probabilities, candidates, augmented_inputs, base_score, scores, 1 + 0.01 * epoch)
|
|
inertia = 0.9 * inertia + 0.1 * round_inertia
|
|
|
|
print_probabilities(probabilities)
|
|
# for candidate in layers:
|
|
# print(candidate.offsets)
|
|
max_score = np.max(scores)
|
|
print(base_score, max_score,round_inertia, inertia)
|
|
|
|
top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:]
|
|
|
|
for i in range(0, num_survivors):
|
|
src_index = top_n[i]
|
|
dest_index = num_candidates + i
|
|
if src_index == dest_index:
|
|
continue
|
|
src = candidates[src_index]
|
|
dest = candidates[dest_index]
|
|
candidates[dest_index] = src
|
|
candidates[src_index] = dest
|
|
|
|
random_sample(sample_size, inputs, augmented_inputs, expected_outputs)
|
|
populate_distances(inputs, distances, scratch)
|
|
epoch += 1
|
|
|
|
candidate = probabilities.flatten()
|
|
# uplift = -1
|
|
# if not candidate is None:
|
|
# print(candidate.offsets)
|
|
# for j in range(0, sample_size):
|
|
# outputs[0][j] = evaluate(layers, candidate, augmented_inputs[j], augmented_int_scratch)
|
|
# np.subtract(outputs[0], expected_outputs, output_xor)
|
|
# np.mod(output_xor, M, output_xor)
|
|
# score = coherence(output_xor, distances)
|
|
|
|
# average_base_score = 0
|
|
# average_score = 0
|
|
# for i in range(0, uplift_sample_size):
|
|
# (inputs, augmented_inputs, expected_outputs) = random_sample(sample_size, N, augment_layers, g, augmented_int_scratch)
|
|
# populate_distances(inputs, distances, scratch)
|
|
# (base_score, score) = compute_uplift(candidate, layers, distances, augmented_inputs, outputs, output_xor, expected_outputs, sample_size, augmented_int_scratch)
|
|
# average_base_score += base_score
|
|
# average_score += score
|
|
# average_base_score /= uplift_sample_size
|
|
# average_score /= uplift_sample_size
|
|
# uplift = average_score - average_base_score
|
|
# print(uplift)
|
|
|
|
# if uplift <= 0:
|
|
# layer += 1
|
|
# # augment_layers = layers[1:]
|
|
# continue
|
|
if candidate is None:
|
|
if probabilities.eliminate_random_known():
|
|
probabilities.reset()
|
|
continue
|
|
layer += 1
|
|
continue
|
|
|
|
layer_id = candidate_str(candidate)
|
|
if layer_id in unique_layers:
|
|
if probabilities.eliminate_random_known():
|
|
if probabilities.eliminate_random_known():
|
|
probabilities.reset()
|
|
continue
|
|
layer += 1
|
|
continue
|
|
|
|
unique_layers.add(layer_id)
|
|
layers.append(candidate)
|
|
cached_f = cache_layers(layers)
|
|
probabilities.eliminate_random_known()
|
|
probabilities.reset()
|
|
|
|
for i in range(0, len(candidate.offsets)):
|
|
if candidate.offsets[i] == 1:
|
|
f.write(str(i))
|
|
f.write(' ')
|
|
f.write('\n')
|
|
f.flush()
|
|
|
|
# if layer == 1:
|
|
# layer += 1
|
|
|
|
for candidate in layers:
|
|
print(candidate.offsets)
|
|
|
|
if __name__ == "__main__":
|
|
main() |