from cmath import isnan import numpy as np import random import hashlib import math def get_state_id(state): return ','.join([str(x) for x in sorted(state)]) class Point(): def __init__(self, x, y): self.x = x self.y = y def id(self): return ','.join([str(int(x)) for x in self.x]) class Influence(): def __init__(self, a, b): self.a = a self.b = b self.original_dof = set() self.dof = set() for i in range(0, len(a.x)): if a.x[i] != b.x[i]: self.original_dof.add(i) self.dof.add(i) def coherent(self): return self.a.y == self.b.y def id(self): return ','.join(sorted([self.a.id(), self.b.id()])) def encode(v): byte_values = [] for i in range(0, math.ceil(len(v) / 8)): x = 0 for j in range(0, 8): index = i * 8 + j if index >= len(v): continue x <<= 1 x |= int(v[index]) byte_values.append(x) return bytearray(byte_values) def decode(x, N): index = 0 output = np.zeros((N)) while x > 0 and index < N: output[index] = x & 0b1 x >>= 1 index += 1 return output def sha(v): x = encode(v) m = hashlib.sha256() m.update(x) result = m.digest() return result[0] & 0b1 def hamming_distance(a, b): return np.sum(np.logical_xor(a.x, b.x)) def random_x(N): x = np.zeros((N)) for i in range(0, N): x[i] = random.randint(0, 1) return x def xor_n(x, n): return sum(x[:n]) % 2 def create_dof_map(influences): dof_map = {} for influence in influences: for i in influence.dof: if not i in dof_map: dof_map[i] = [] dof_map[i].append(influence) return dof_map def flip(influences, i): for influence in influences: if i in influence.dof: influence.a.y = int(influence.a.y) ^ 1 def remove_dof(dof_map, i, flip = False): for influence in dof_map[i]: influence.dof.remove(i) if flip: influence.a.y = int(influence.a.y) ^ 1 # if len(influence.dof) == 0 and not influence.coherent(): # raise Exception('Invalid') del dof_map[i] def solve(dof_map, all_influences, all_samples): eliminated = True while eliminated: eliminated = False for influence in all_influences: if len(influence.dof) == 1: i = next(iter(influence.dof)) if influence.coherent: remove_dof(dof_map, i) eliminated = True else: print('Forced', i) remove_dof(dof_map, i, True) eliminated = True lowest_dof = None for influence in all_influences: if not influence.coherent and len(influence.dof) > 1: if lowest_dof is None or len(influence.dof) < len(lowest_dof.dof): lowest_dof = influence flip = None highest_score = -1 for i in lowest_dof.dof: per_point_scores = {} i_influences = dof_map[i] left = 0 right = 0 for influence in i_influences: if not influence.a in per_point_scores: per_point_scores[influence.a] = [0, 0] if not influence.b in per_point_scores: per_point_scores[influence.b] = [0, 0] if influence.coherent: per_point_scores[influence.a][0] += 1 per_point_scores[influence.b][0] += 1 left += 1 else: per_point_scores[influence.a][1] += 1 per_point_scores[influence.b][1] += 1 right += 1 print(i, left / (left + right)) num = 0 denom = 0 for _, score in per_point_scores.items(): if score[0] == score[1]: continue print(i, score) num += score[1] / (score[0] + score[1]) denom += 1 score = num / denom if denom > 0 else 0 print(score) return None # 1st row (n+1 choose k+1) * (1-(k mod 2)) # psuedopascal to compute the follow-on rows # assuming solvability, we want to maximize the probability that our current state and our state with # a particular single flip are one order apart in the correct direction # 2, 0 # 2, 2, 0 # 2, 4, 2, 0 # 2, 6, 6, 2, 0 # 2, 8,12, 8, 2, 0 # 2,10,20,20,10, 2, 0 # 3,-9,19,-33,51,-73,99 # 3,-6,10,-14,18,-22,26 # 3,-3, 4, -4, 4, -4, 4 # 3, 0, 1, 0, 0, 0, 0 # 3, 3, 1, 1, 0, 0, 0 # 3, 6, 4, 2, 1, 0, 0 # 3, 9,10, 6, 3, 1, 0 # 4, 0, 4, 0 # 4, 4, 4, 4, 0 # 4, 8, 8, 8, 4, 0 # 4,12,16,16,12, 4, 0 # 5, 0,10, 0, 1 # 5, 5,10,10, 1, 1 # 5, # 5, # 3 # # @1 [1, 2, 1] # @2 [2, 2, 0] # @3 [3, 0, 1] # 5 [5, 10, 10, 5, 1] (5 choose 1, 5 choose 2, ...) # # @1 [1, 4, 6, 4, 1], [4, 6, 4, 1, 0] - 16, 15 - binomial (4 choose 0, 4 choose 1, 4 choose 2), # @2 [2, 6, 6, 2, 0], [3, 4, 4, 3, 1] - 16, 15 - (4 choose 1) + (2 choose -1) - (2 choose 1) # @3 [3, 6, 4, 2, 1], [2, 4, 6, 3, 0] - 16, 15 - (4 choose 2) + (2 choose -2) - (2 choose 2) + (2 choose -1) - (2 choose 1) # @4 [4, 4, 4, 4, 0], [1, 6, 6, 1, 1] - 16, 15 - # @5 [5, 0, 10, 0, 1], [0, 10, 0, 5, 0] - 16, 15 - # @0 [0.0, 0.0, 0.0, 0.0, 0.0] # @1 [0.2, 0.4, 0.6, 0.8, 1.0] # @2 [0.4, 0.6, 0.6, 0.4, 0.0] # @3 [0.6, 0.6, 0.4, 0.4, 1.0] # @4 [0.8, 0.4, 0.4, 0.8, 0.0] # @5 [1.0, 0.0, 1.0, 0.0, 1.0] # 6 # # @1 [1, 5, 10, 10, 5, 1] # @2 [2, 8, 12, 8, 2, 0] # @3 [3, 9, 10, 6, 3, 1] # @4 [4, 8, 8, 8, 4, 0] # @5 [5, 5, 10, 10, 1, 1] # @6 [6, 0, 20, 0, 6, 0] # last row, 1 if odd, 0 if even # second to last, subtract 2 on odds, add 2 on evens def compute_pseudopascal(N): dist = np.zeros((N, N)) for j in range(0, N): dist[0][j] = math.comb(N - 1, j) dist[-1][j] = math.comb(N, j + 1) * (1 - (j % 2)) for i in range(1, N): for j in range(0, i + 1): dist[i][j] = math.comb(i + 1, j + 1) * (1 - (j % 2)) for k in range(i + 1, N): for j in reversed(range(0, k)): dist[i][j+1] = dist[i][j] + dist[i][j+1] return dist def compute_distributions(N): dist = compute_pseudopascal(N) print(dist) for i in range(0, N): for j in range(0, N): denom = math.comb(N, j+1) dist[i][j] /= denom return dist def confusion_probabilities(N, samples): sample_sizes = np.zeros(N) for i in range(0, len(samples)): a = samples[i] for j in range(0, len(samples)): b = samples[j] if i == j: continue distance = hamming_distance(a, b) sample_sizes[distance - 1] += 1 confusion = np.zeros((N, N)) dist = compute_pseudopascal(N) np.multiply(dist, 2 ** N, dist) # These are the probabilities that we might mix up any two orders given a particular sample size for i in range(0, N): for j in range(0, N): probability = 1.0 for k in range(0, N): full_size = math.comb(N, k+1) * (2 ** N) sample_size = sample_sizes[k] num_unknowns = full_size - sample_size i_incoherent = dist[i][k] # Worst case, we sample only the coherent points, i_min = max(i_incoherent - num_unknowns, 0) / full_size i_max = min(sample_size, i_incoherent) / full_size u = i_min + i_max / 2 s = (i_max - i_min) / 2 probability *= raised_cosine(dist[j][k] / full_size, u, s) confusion[i][j] = probability return confusion def raised_cosine(x, u, s): if x < (u - s): return 0 if x > (u + s): return 0 return 1.0 / (2.0 * s) * (1 + math.cos(math.pi * (x - u) / s)) # Probability of getting k red balls after drawing n from a bag with m total balls and j red balls in it # (n choose k) * p^k * (1-p)^(n-k) # p/m chance of getting a red ball # (1 - p/m) chance of not getting a red ball # One way (p/m) * ((p-1)/(m-1)) * ((p-2)/(m-2)) # (1 - (p/m)) def p_bernoulli(n, k, m, j): # probabilities = np.zeros((n + 1, n + 1)) # probabilities.fill(-1) # # if n == k: # # return 1.0 # # if k > p: # # return 0.0 # stack = [(0,0)] # while len(stack) > 0: # (a, b) = stack.pop() # if a + b == n: # probabilities[a][b] = 1 if a == k else 0 # elif a > j: # probabilities[a][b] = 0 # elif b > (m - j): # probabilities[a][b] = 0 # else: # p_left = probabilities[a + 1][b] # p_right = probabilities[a][b + 1] # if p_left >= 0 and p_right >= 0: # p = (j - a) / (m - a - b) # probabilities[a][b] = p_left * p + p_right * (1 - p) # else: # stack.append((a, b)) # if p_left < 0: # stack.append((a + 1, b)) # if p_right < 0: # stack.append((a, b + 1)) # return probabilities[0][0] p = j / m P = 1.0 p_k = 0 p_nk = 0 for i in range(1, k + 1): P *= (n + 1 - i) / i while P > 1.0 and p_k < k: P *= p p_k += 1 while P > 1.0 and p_nk < (n - k): P *= (1 - p) p_nk += 1 while p_k < k: P *= p p_k += 1 while (p_nk < (n - k)): P *= (1 - p) p_nk += 1 return P def average_index(x): total = 0 for k in range(0, len(x)): total += k * x[k] return total / np.sum(x) def compute_cumulative_probability(N, bases, p_n): # p_n = np.zeros(N) # p_n.fill(0.5) states = [[]] flips = set() for i in range(1, len(bases)): # (base, _) = bases[i] (_, flip) = bases[i] # p_forward = 0 # p_backward = 0 # for k in range(0, N - 1): # p_forward += base[k + 1] * next_p[k] # p_backward += base[k] * next_p[k + 1] if flip in flips: # p_n[flip] -= p_forward # p_n[flip] += p_backward flips.remove(flip) else: # p_n[flip] += p_forward # p_n[flip] -= p_backward flips.add(flip) states.append(flips.copy()) # np.clip(p_n, 0, 1, p_n) # print('Contribution probabilities', p_n) min_p_n = np.min(p_n) max_p_n = np.max(p_n) p_k = np.zeros(N) for k in range(0, N): stack = [(k, len(bases) - 1)] probabilities = np.zeros((N, len(bases))) probabilities.fill(-1) while len(stack) > 0: (i, base_index) = stack.pop() (base, flip) = bases[base_index] if base_index == 0: probabilities[i, 0] = base[i] else: left = i - 1 right = i + 1 state = states[base_index - 1] p_flip = max(min(p_n[flip] + 0.5, 1.0), 0) if flip in state: p_flip = 1 - p_flip p_left = probabilities[left, base_index - 1] if left >= 0 else 0 p_right = probabilities[right, base_index - 1] if right < N else 0 if p_left >= 0 and p_right >= 0: probabilities[i, base_index] = base[i] * p_left * (1 - p_flip) + base[i] * p_right * p_flip else: stack.append((i, base_index)) if p_left < 0: stack.append((left, base_index - 1)) if p_right < 0: stack.append((right, base_index - 1)) p_k[k] = probabilities[k][-1] np.divide(p_k, np.sum(p_k), p_k) return p_k # 8, 32, 2^5 # 10, 64, 2^6 # 12, 128, 2^7 # 14, 256, 2^8 # 16, 512, 2^9 # 18, 1024, 2^10 # 20, 2048, 2^11 # 22, 4096, 2^12 def main(): N = 16 sample_size = 32 e_bits = 2 sample_ids = set() samples = [] dist = compute_pseudopascal(N) print(dist) for i in range(0, sample_size): x = random_x(N) y = int(xor_n(x, e_bits)) p = Point(x, y) p_id = p.id() if p_id in sample_ids: continue sample_ids.add(p_id) samples.append(p) chords = [{} for _ in range(0, len(samples))] for i in range(0, len(samples)): a = samples[i] for j in range(i + 1, len(samples)): b = samples[j] distance = hamming_distance(a, b) if distance not in chords[i]: chords[i][distance] = [] chords[i][distance].append(j) if distance not in chords[j]: chords[j][distance] = [] chords[j][distance].append(i) probability = np.zeros((N, N)) scalars = np.ones(N) for i in range(0, len(samples)): origin = samples[i] for (distance, points) in chords[i].items(): n = len(points) t = math.comb(N, distance) a = sum([0 if origin.y == samples[index].y else 1 for index in points]) for k in range(1, N - 1): p = dist[k][distance - 1] prob_at_k = p_bernoulli(n, a, t, p) for flip in range(0, N): a_flip = sum([0 if origin.y == samples[index].y and origin.x[flip] == samples[index].x[flip] or origin.y != samples[index].y and origin.x[flip] != samples[index].x[flip] else 1 for index in points]) p_forward = dist[k - 1][distance - 1] p_backward = dist[k + 1][distance - 1] prob_at_k_forward = p_bernoulli(n, a_flip, t, p_forward) prob_at_k_backward = p_bernoulli(n, a_flip, t, p_backward) # prob_at_k_backward = 0 probability[k][flip] += (n / t) * prob_at_k * (prob_at_k_forward - prob_at_k_backward) # probability[k][flip] *= prob_at_k * prob_at_k_forward # scalars[k] *= np.max(probability[k]) # np.divide(probability[k], np.max(probability[k]), probability[k]) # print(scalars) print(probability) return coherent_distances = np.zeros(N + 1) incoherent_distances = np.zeros(N + 1) total_distances = np.zeros(N + 1) for i in range(0, len(samples)): coherent_distances.fill(0) incoherent_distances.fill(0) total_distances.fill(0) a = samples[i] for j in range(0, len(samples)): b = samples[j] distance = hamming_distance(a, b) is_coherent = a.y == b.y total_distances[distance] += 1 if is_coherent: coherent_distances[distance] += 1 else: incoherent_distances[distance] += 1 print(total_distances) print(incoherent_distances) print() for d in range(1, N + 1): n = coherent_distances[d] + incoherent_distances[d] if n == 0: continue local_probability = np.ones(N) for k in range(0, N): a = incoherent_distances[d] t = math.comb(N, d) p = dist[k][d - 1] prob = p_bernoulli(int(n), int(a), t, p) local_probability[k] = prob probability[i][k] *= prob print(local_probability) np.divide(probability[i], np.sum(probability[i]), probability[i]) print() print(probability) total_probability = np.ones(N) for i in range(0, len(samples)): np.multiply(probability[i], total_probability, total_probability) np.divide(total_probability, np.sum(total_probability), total_probability) print(total_probability) return # confusion = confusion_probabilities(N, samples) # print(confusion) # return # for i in range(0, 2**N): # x = decode(i, N) # y = int(xor(x)) # samples.append(Point(x,y)) base = np.zeros(N) current = np.zeros(N) cumulative_probability = np.ones(N) flip_likelihood = np.zeros(N) cumulative_deltas = np.zeros(N) direction = -1 flips = set() bases = [] last_flip = -1 for _ in range(0, 2 ** N): lowest_err = -1 use_flip = -1 for flip in range(-1, N): coherent_distances = np.zeros(len(samples), N+1) incoherent_distances = np.zeros(N+1) all_coherent = True for i in range(0, len(samples)): a = samples[i] for j in range(0, len(samples)): b = samples[j] distance = hamming_distance(a, b) is_coherent = ((flip < 0 or a.x[flip] == b.x[flip]) and a.y == b.y) or ((flip >= 0 and a.x[flip] != b.x[flip]) and a.y != b.y) if is_coherent: coherent_distances[distance] += 1 else: incoherent_distances[distance] += 1 all_coherent = False if all_coherent: print('Flip and halt', flip) return # print(coherent_distances, incoherent_distances) # print(coherent_distances, incoherent_distances) # est_incoherence = np.divide(incoherent_distances, np.add(coherent_distances, incoherent_distances)) # print(est_incoherence) probability = np.ones(N) # np.divide(probability, np.sum(probability), probability) for j in range(1, N + 1): n = coherent_distances[j] + incoherent_distances[j] if n == 0: continue for k in range(0, N): a = incoherent_distances[j] t = math.comb(N, j) * (2 ** N) p = dist[k][j - 1] * (2 ** N) prob = p_bernoulli(int(n), int(a), t, p) probability[k] *= prob np.divide(probability, np.sum(probability), probability) if flip < 0: np.copyto(base, probability) else: np.copyto(current, probability) # print(k, i, min_true_value, max_true_value) # confidence = (coherent_distances[i] + incoherent_distances[i]) / math.comb(N, i) # probability that the sample is representative # err += abs(est_incoherence[i] - known_incoherence_at_k[i-1]) * confidence # denom += 1 # print(flip, k, err) # err /= denom # if flip < 0: # base[k] = probability # else: # current[k] = probability if flip >= 0: if np.sum(current) == 0: continue np.divide(current, np.sum(current), current) # print(current) # temp = np.roll(cumulative_probability, -1) # temp[-1] = 1.0 # np.multiply(current, temp, current) # np.divide(current, np.sum(current), current) p_forward = 0 p_backward = 0 for i in range(1, N): p_forward += base[i] * current[i - 1] for i in range(0, N - 1): p_backward += base[i] * current[i + 1] scale = 0.01 if flip in flips: flip_likelihood[flip] += scale * p_backward flip_likelihood[flip] -= scale * p_forward else: flip_likelihood[flip] -= scale * p_backward flip_likelihood[flip] += scale * p_forward delta = p_forward - p_backward print(flip, current, p_forward, p_backward) base_index = average_index(base) current_index = average_index(current) err = abs(1 - (base_index - current_index)) print(base_index, current_index, err) # base_index = average_index(cumulative_probability) # new_index = average_index(current) # if isnan(new_index): # continue # np.divide(current, np.sum(current), current) # np.subtract(1, current, current) # print(flip,p_forward,p_backward,current) if delta > 0 and (use_flip < 0 or delta > lowest_err): use_flip = flip lowest_err = delta # cumulative_deltas[flip] += 0 # for k in range(0, N - 1): # value = current[k] * cumulative_probability[k + 1] # if use_flip < 0 or value > lowest_err: # use_flip = flip # lowest_err = value # print(flip, highest_value) else: # p_next = np.zeros(N) # for i in range(0, N): # P = 0.0 # for j in range(0, N): # if i == j: # continue # P += base[i] * (1 - base[j]) # p_next[i] = P # base = p_next # base[0] = 0 np.divide(base, np.sum(base), base) bases.append((base.copy(), last_flip)) # bases.insert(0, base.copy()) # cumulative_probability = compute_cumulative_probability(N, bases) # p_forward = 0 # p_backward = 0 # for i in range(1, N): # p_forward += cumulative_probability[i] * base[i - 1] # for i in range(0, N - 1): # p_backward += cumulative_probability[i] * base[i + 1] print('Base', base) # # # np.subtract(1, base, base) # # # print(cumulative_probability) # shift_left = np.roll(cumulative_probability, -1) # shift_left[-1] = 0.0 # # # # print('Shift Left', p_forward, shift_left) # shift_right = np.roll(cumulative_probability, 1) # shift_right[0] = 0.0 # # # # print('Shift Right', p_backward, shift_right) # p_next = np.add(np.multiply(shift_left, 0.5), np.multiply(shift_right, 0.5)) # p_next[0] = 0 # np.divide(p_next, np.sum(p_next), p_next) # # # # print('Next', p_next) # # # # # print(cumulative_probability) # # # # # print(base) # np.multiply(base, p_next, cumulative_probability) # cumulative_probability[0] = 0 # # # # # np.multiply(cumulative_probability, shift_right, cumulative_probability) # np.divide(cumulative_probability, np.sum(cumulative_probability), cumulative_probability) cumulative_probability = compute_cumulative_probability(N, bases, flip_likelihood) print('Cumulative', cumulative_probability) print('Likelihood', flip_likelihood) # cumulative_probability[0] = 0 # use_flip = -1 # if direction < 0: # use_flip = np.argmax(cumulative_deltas) # if cumulative_deltas[use_flip] < 0: # use_flip = np.argmin(cumulative_deltas) # direction = 1 # # cumulative_deltas.fill(0) # else: # use_flip = np.argmin(cumulative_deltas) # if cumulative_deltas[use_flip] > 0: # use_flip = np.argmax(cumulative_deltas) # direction = -1 # # cumulative_deltas.fill(0) # if direction < 0: # cumulative_probability[0] = 0 # else: # cumulative_probability[-1] = 0 # np.divide(cumulative_probability, np.sum(cumulative_probability), cumulative_probability) # print(cumulative_deltas) # use_flip = -1 # highest_p = 0 # for i in range(0, N): # p = flip_likelihood[i] # if i in flips: # p = -p # if use_flip < 0 or p > highest_p: # use_flip = i # highest_p = p # if not use_flip in flips and highest_p < 0 or use_flip in flips and highest_p > 0: # flip_likelihood[use_flip] *= -1.0 if use_flip < 0: return last_flip = use_flip if use_flip in flips: flips.remove(use_flip) else: flips.add(use_flip) print('Flip', use_flip, lowest_err) print(flips) cumulative_deltas[use_flip] = -cumulative_deltas[use_flip] for p in samples: if p.x[use_flip]: p.y ^= 1 if __name__ == "__main__": main()