from enum import unique import hashlib import math import numpy as np import random import time N = 8 N_ACTUAL = 2 * ((N - 1) + 8) M = 2 def vec_to_int(x): z = 0 for i in range(0, len(x)): z <<= 1 z |= x[i] return z def timeit(f): def timed(*args, **kw): ts = time.time() result = f(*args, **kw) te = time.time() print('func:%r took: %2.4f sec' % (f.__name__, te-ts)) return result return timed class Candidate: def __init__(self, layer): global N_ACTUAL self.layer = layer self.offsets = np.zeros((N_ACTUAL)).astype(np.int32) class Probabilities: def __init__(self, layer): global N_ACTUAL self.layer = layer self.p_offsets = np.zeros((N_ACTUAL)) self.p_offsets.fill(0.5) self.p_offsets_next = np.zeros((N_ACTUAL)) self.offset_coherences = np.zeros((N_ACTUAL)) self.offset_coherences.fill(-1) self.knowns = set() def snap(self): reset = False for j in range(0, len(self.p_offsets)): if self.p_offsets[j] > 0.6 and self.p_offsets[j] < 0.95: self.p_offsets[j] = 1.0 self.knowns.add(j) flip = j ^ 0b1 self.p_offsets[flip] = 0.0 reset = True break elif self.p_offsets[j] < 0.05: self.p_offsets[j] = 0.0 if reset: for j in range(0, len(self.p_offsets)): flip = j ^ 0b1 if self.p_offsets[j] < 0.95 and self.p_offsets[flip] < 0.95: self.p_offsets[j] = 0.5 def eliminate_random_known(self): if len(self.knowns) == 0: return False index = random.sample(self.knowns, 1)[0] self.knowns.remove(index) return True def reset(self): self.p_offsets.fill(0.5) for index in self.knowns: flip = index ^ 0b1 self.p_offsets[index] = 1.0 self.p_offsets[flip] = 0.0 def all_zeros(self): for j in range(0, len(self.p_offsets)): if self.p_offsets[j] > 0.05 and self.p_offsets[j] < 0.95: return False return True def has_converged(self): if self.all_zeros(): return True top_n = sorted(range(len(self.p_offsets)), key=lambda i: self.p_offsets[i])[-self.layer:] for i in top_n: if self.p_offsets[i] < 0.95: return False return True def flatten(self): candidate = Candidate(self.layer) top_n = sorted(range(len(self.p_offsets)), key=lambda i: self.p_offsets[i])[-self.layer:] for i in top_n: if self.p_offsets[i] < 0.95: return None candidate.offsets[i] = 1 return candidate def clamp(x, min_value = 0.01, max_value = 1): return min(max(x, min_value), max_value) def encode(v): byte_values = [] for i in range(0, math.ceil(len(v) / 8)): x = 0 for j in range(0, 8): index = i * 8 + j if index >= len(v): continue x <<= 1 x |= int(v[index]) byte_values.append(x) return bytearray(byte_values) # 00100111 x4 # 00000110 x1 def sha(v): global M x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result[0] % M def sha_byte(v): x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result def xor(x): num_one_bits = 0 for i in range(0, len(x)): if i == 0: continue num_one_bits += x[i] return num_one_bits % 2 # 0 ^ 1 ^ (2 ^ (4 * (5 ^ 0 * 7))) * (3 ^ 6 * 7) # 0 ^ 1 ^ 2 * 3 ^ 2 * 6 * 7 ^ 3 * 4 * (5 ^ 0 * 7)) ^ 4 * 6 * 7 * (5 ^ 0 * 7) # 0 ^ 1 ^ 2 * 3 ^ 2 * 6 * 7 ^ 3 * 4 * 5 ^ 0 * 3 * 4 * 7 ^ 4 * 5 * 6 * 7 ^ 0 * 4 * 6 * 7 # 0 ^ 1 ^ 2*3 ^ 2*6*7 ^ 3*4*5 ^ 0*3*4*7 ^ 4*5*6*7 ^ 0*4*6*7 # What about strictly SOP? # That is, 1-Hot of increasing complexity? # How would that work? # Candidate generation could apply some kind of softmax to filter down to one # def test_fn(x): # 0 1 # 2 | 3 # 4 | 5 | 6 | 7 # | | 0 | 7 | | | | return x[0] ^ x[1] ^ ((x[2] ^ (x[4] * (x[5] ^ (x[0] * x[7])))) * (x[3] ^ (x[6] * x[7]))) def candidate_fn(x): return x[0] ^ x[1] ^ (~(x[2] ^ x[3]) * x[2]) def true_fn(x): return x[0] ^ x[1] ^ (x[3] * x[2]) def hamming_distance(a, b, scratch): np.logical_xor(a, b, scratch) return sum(scratch) def coherence(outputs, distances): coherences = [] for i in range(0, len(outputs)): y_a = outputs[i] numerator = 0 denominator = 0 for j in range(0, len(outputs)): if i == j: continue y_b = outputs[j] weight = distances[i][j] denominator += weight if y_a == 0 and y_b == 0 or y_a == 1 and y_b == 1: numerator += weight coherence = numerator / denominator if denominator > 0 else 0 coherences.append(coherence) return sum(coherences) / len(coherences) def random_sample(m, inputs, augmented_inputs, outputs): global N, N_ACTUAL for i in range(0, m): for j in range(0, N): val = random.randint(0, 1) inputs[i][j] = val if j > 0: augmented_inputs[i][(j - 1) * 2] = val augmented_inputs[i][(j - 1) * 2 + 1] = 1 - val # augmented_inputs[i][j * 2] = val # augmented_inputs[i][j * 2 + 1] = 1 - val output = sha_byte(inputs[i]) outputs[i] = inputs[i][0] for k in range(0, 1): output_byte = output[k] for j in range(0, 8): val = (output_byte >> j) & 0b1; inputs[i][k * 8 + j] = val augmented_inputs[i][(N - 1 + k * 8 + j) * 2] = val augmented_inputs[i][(N - 1 + k * 8 + j) * 2 + 1] = 1 - val # outputs[i] = g(inputs[i]) return (inputs, augmented_inputs, outputs) def populate_distances(inputs, distances, scratch): for i in range(0, len(inputs)): x_a = inputs[i] for j in range(0, len(inputs)): if i == j: continue x_b = inputs[j] distance = hamming_distance(x_a, x_b, scratch) distances[i][j] = 1.0 / (2 ** distance) def evaluate(layers, candidate, x, compute_scratch): z = evaluate_layers(layers, x, compute_scratch) z ^= evaluate_candidate(candidate, x, compute_scratch) return z def evaluate_layers(layers, x, compute_scratch): z = 0 for layer in layers: z ^= evaluate_candidate(layer, x, compute_scratch) return z def evaluate_candidate(candidate, x, compute_scratch): compute_scratch.fill(0) compute_scratch[0:len(candidate.offsets)] = candidate.offsets np.multiply(compute_scratch, x, compute_scratch) return 1 if np.sum(compute_scratch) - np.sum(candidate.offsets) == 0 else 0 def layer_str(layer): parts = [] for i in range(0, len(layer.offsets)): if layer.offsets[i] == 1: parts.append('x[' + str(i) + ']') return '*'.join(parts) def cache_layers(layers): expr = 'def f(x):\n\tresult=0\n' for i in range(0, len(layers)): layer = layers[i] expr += '\tresult^=' + layer_str(layer) + '\n' expr += '\treturn result\n' scope = {} exec(expr, scope) return scope['f'] @timeit def compute_scores(probabilities, candidates, num_candidates, layers, scores, distances, inputs, outputs, output_xor, expected_outputs, sample_size, int_scratch, cached_f): global M for i in range(0, sample_size): outputs[0][i] = cached_f(inputs[i]) # outputs[0][i] = evaluate_layers(layers, inputs[i], int_scratch) # check = cached_f(inputs[i]) # if check != outputs[0][i]: # raise Exception('Mistake') for j in range(1, num_candidates): np.copyto(outputs[j], outputs[0]) np.subtract(outputs[0], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) base_score = coherence(output_xor, distances) scores.fill(0) unique_candidates = {} for j in range(0, num_candidates): create_candidate(probabilities, candidates[j]) unique_candidates[candidate_str(candidates[j])] = j for i in range(0, sample_size): for _, j in unique_candidates.items(): candidate = candidates[j] outputs[j][i] ^= evaluate_candidate(candidate, inputs[i], int_scratch) for _, j in unique_candidates.items(): candidate = candidates[j] np.subtract(outputs[j], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) score = coherence(output_xor, distances) scores[j] = score # for j in range(0, num_candidates): # candidate = candidates[j] # create_candidate(probabilities, candidate) # for i in range(0, sample_size): # for j in range(0, num_candidates): # candidate = candidates[j] # outputs[j][i] ^= evaluate_candidate(candidate, inputs[i], int_scratch) # for j in range(0, num_candidates): # candidate = candidates[j] # np.subtract(outputs[j], expected_outputs, output_xor) # np.mod(output_xor, M, output_xor) # score = coherence(output_xor, distances) # scores[j] = score return base_score def compute_uplift(candidate, layers, distances, inputs, outputs, output_xor, expected_outputs, sample_size, int_scratch): global M for i in range(0, sample_size): outputs[0][i] = evaluate_layers(layers, inputs[i], int_scratch) np.subtract(outputs[0], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) base_score = coherence(output_xor, distances) for i in range(0, sample_size): outputs[0][i] ^= evaluate_candidate(candidate, inputs[i], int_scratch) np.subtract(outputs[0], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) score = coherence(output_xor, distances) return (base_score, score) @timeit def update_probabilities(probabilities, candidates, inputs, base_score, scores, scale): num_candidates = len(candidates) probabilities.offset_coherences.fill(-1) for p in range(0, num_candidates): score = scores[p] if score == 0: continue candidate = candidates[p] for j in range(0, len(candidate.offsets)): if candidate.offsets[j] == 0: continue probabilities.offset_coherences[j] = max(score, probabilities.offset_coherences[j]) inertia = 0 for j in range(0, len(probabilities.p_offsets_next)): p = probabilities.offset_coherences[j] delta = p - base_score if p >= 0 else 0 probabilities.p_offsets_next[j] = clamp(probabilities.p_offsets[j] + delta, 0, 1) inertia += abs(probabilities.p_offsets_next[j] - probabilities.p_offsets[j]) for j in range(0, len(probabilities.p_offsets_next)): p_offset_next = 0.9 * probabilities.p_offsets[j] + 0.1 * probabilities.p_offsets_next[j] # if p_offset_next <= 0.05: # p_offset_next = 0.0 # elif p_offset_next >= 0.95: # p_offset_next = 1.0 probabilities.p_offsets[j] = p_offset_next # total = np.sum(probabilities.p_offsets[j]) # probabilities.p_offsets[j] *= 1.0 / total probabilities.snap() return inertia def create_candidate(probabilities, candidate): candidate.offsets.fill(0) scores = np.empty_like(candidate.offsets).astype(np.float32) for j in range(0, len(probabilities.p_offsets)): if probabilities.p_offsets[j] == 1.0: scores[j] = 1000 elif probabilities.p_offsets[j] == 0.0: scores[j] = -1000 else: scores[j] = random.random() + probabilities.p_offsets[j] top = sorted(range(len(scores)), key=lambda i: scores[i], reverse = True) picked = set() for i in top: flip = i ^ 0b1 if flip in picked: continue candidate.offsets[i] = 1 picked.add(i) if len(picked) == candidate.layer: return def copy_candidate(src, dest): for j in range(0, len(src.offsets)): dest.offsets[j] = src.offsets[j] def p(x): return math.ceil(x * 100) / 100 def p_a(x): return [p(z) for z in x] def print_probabilities(probabilities): print('=====================') print(p_a(probabilities.p_offsets)) print('=====================') def candidate_str(candidate): build_str = '' for j in range(0, len(candidate.offsets)): build_str += str(candidate.offsets[j]) return build_str def main(): global N, N_ACTUAL, M sample_size = 64 num_candidates = 100 num_survivors = 1 uplift_sample_size = 128 output_xor = np.zeros(sample_size,) scratch = np.zeros((N,)) int_scratch = np.zeros((N,)).astype(np.int32) g = sha layers = [] unique_layers = set() augment_layers = [] layer = 1 inputs = np.zeros((sample_size, N)).astype(np.int32) augmented_inputs = np.zeros((sample_size, N_ACTUAL)).astype(np.int32) expected_outputs = np.zeros((sample_size,)).astype(np.int32) random_sample(sample_size, inputs, augmented_inputs, expected_outputs) distances = np.zeros((sample_size, sample_size)) populate_distances(inputs, distances, scratch) outputs = np.zeros((num_candidates + num_survivors, sample_size,)).astype(np.int32) scores = np.zeros((num_candidates + num_survivors,)) cached_f = cache_layers(layers) probabilities = Probabilities(1) np.subtract(outputs[0], expected_outputs, output_xor) np.mod(output_xor, M, output_xor) score = coherence(output_xor, distances) with open('model.txt', 'w') as f: while score < 1: probabilities.layer = layer candidates = [Candidate(layer) for _ in range(0, num_candidates + num_survivors)] augmented_int_scratch = np.zeros((N_ACTUAL,)).astype(np.int32) random_sample(sample_size, inputs, augmented_inputs, expected_outputs) populate_distances(inputs, distances, scratch) inertia = 1 epoch = 1 while inertia > 0.001 and epoch < 2000 and not probabilities.has_converged(): base_score = compute_scores(probabilities, candidates, num_candidates, layers, scores, distances, augmented_inputs, outputs, output_xor, expected_outputs, sample_size, augmented_int_scratch, cached_f) round_inertia = update_probabilities(probabilities, candidates, augmented_inputs, base_score, scores, 1 + 0.01 * epoch) inertia = 0.9 * inertia + 0.1 * round_inertia print_probabilities(probabilities) # for candidate in layers: # print(candidate.offsets) max_score = np.max(scores) print(base_score, max_score,round_inertia, inertia) top_n = sorted(range(len(scores)), key=lambda i: scores[i])[-num_survivors:] for i in range(0, num_survivors): src_index = top_n[i] dest_index = num_candidates + i if src_index == dest_index: continue src = candidates[src_index] dest = candidates[dest_index] candidates[dest_index] = src candidates[src_index] = dest random_sample(sample_size, inputs, augmented_inputs, expected_outputs) populate_distances(inputs, distances, scratch) epoch += 1 candidate = probabilities.flatten() # uplift = -1 # if not candidate is None: # print(candidate.offsets) # for j in range(0, sample_size): # outputs[0][j] = evaluate(layers, candidate, augmented_inputs[j], augmented_int_scratch) # np.subtract(outputs[0], expected_outputs, output_xor) # np.mod(output_xor, M, output_xor) # score = coherence(output_xor, distances) # average_base_score = 0 # average_score = 0 # for i in range(0, uplift_sample_size): # (inputs, augmented_inputs, expected_outputs) = random_sample(sample_size, N, augment_layers, g, augmented_int_scratch) # populate_distances(inputs, distances, scratch) # (base_score, score) = compute_uplift(candidate, layers, distances, augmented_inputs, outputs, output_xor, expected_outputs, sample_size, augmented_int_scratch) # average_base_score += base_score # average_score += score # average_base_score /= uplift_sample_size # average_score /= uplift_sample_size # uplift = average_score - average_base_score # print(uplift) # if uplift <= 0: # layer += 1 # # augment_layers = layers[1:] # continue if candidate is None: if probabilities.eliminate_random_known(): probabilities.reset() continue layer += 1 continue layer_id = candidate_str(candidate) if layer_id in unique_layers: if probabilities.eliminate_random_known(): if probabilities.eliminate_random_known(): probabilities.reset() continue layer += 1 continue unique_layers.add(layer_id) layers.append(candidate) cached_f = cache_layers(layers) probabilities.eliminate_random_known() probabilities.reset() for i in range(0, len(candidate.offsets)): if candidate.offsets[i] == 1: f.write(str(i)) f.write(' ') f.write('\n') f.flush() # if layer == 1: # layer += 1 for candidate in layers: print(candidate.offsets) if __name__ == "__main__": main()