""" ported from https://github.com/privy-io/shamir-secret-sharing/blob/main/src/index.ts i barely dealt with actual hands-on cryptography like this, neither do i understand the math behind it so this is probably not the best implementation, but it works quite well for my private services """ import numpy as np import secrets import ctypes import asyncio import platform __name__= "Shamir´s Secret Sharing by serpent192 / Privy Services (port)" if platform.system() == "Windows": libc = ctypes.CDLL("msvcrt.dll") else: libc = ctypes.CDLL(None) malloc= libc.malloc malloc.restype= ctypes.c_void_p malloc.argtypes= [ctypes.c_size_t] _loop = None def as_run(coro): global _loop if _loop is None: _loop= asyncio.new_event_loop() asyncio.set_event_loop(_loop) return _loop.run_until_complete(coro) class SharmirsShares: def __init__(self): self.ExpIndex: list= [ 0x01, 0xe5, 0x4c, 0xb5, 0xfb, 0x9f, 0xfc, 0x12, 0x03, 0x34, 0xd4, 0xc4, 0x16, 0xba, 0x1f, 0x36, 0x05, 0x5c, 0x67, 0x57, 0x3a, 0xd5, 0x21, 0x5a, 0x0f, 0xe4, 0xa9, 0xf9, 0x4e, 0x64, 0x63, 0xee, 0x11, 0x37, 0xe0, 0x10, 0xd2, 0xac, 0xa5, 0x29, 0x33, 0x59, 0x3b, 0x30, 0x6d, 0xef, 0xf4, 0x7b, 0x55, 0xeb, 0x4d, 0x50, 0xb7, 0x2a, 0x07, 0x8d, 0xff, 0x26, 0xd7, 0xf0, 0xc2, 0x7e, 0x09, 0x8c, 0x1a, 0x6a, 0x62, 0x0b, 0x5d, 0x82, 0x1b, 0x8f, 0x2e, 0xbe, 0xa6, 0x1d, 0xe7, 0x9d, 0x2d, 0x8a, 0x72, 0xd9, 0xf1, 0x27, 0x32, 0xbc, 0x77, 0x85, 0x96, 0x70, 0x08, 0x69, 0x56, 0xdf, 0x99, 0x94, 0xa1, 0x90, 0x18, 0xbb, 0xfa, 0x7a, 0xb0, 0xa7, 0xf8, 0xab, 0x28, 0xd6, 0x15, 0x8e, 0xcb, 0xf2, 0x13, 0xe6, 0x78, 0x61, 0x3f, 0x89, 0x46, 0x0d, 0x35, 0x31, 0x88, 0xa3, 0x41, 0x80, 0xca, 0x17, 0x5f, 0x53, 0x83, 0xfe, 0xc3, 0x9b, 0x45, 0x39, 0xe1, 0xf5, 0x9e, 0x19, 0x5e, 0xb6, 0xcf, 0x4b, 0x38, 0x04, 0xb9, 0x2b, 0xe2, 0xc1, 0x4a, 0xdd, 0x48, 0x0c, 0xd0, 0x7d, 0x3d, 0x58, 0xde, 0x7c, 0xd8, 0x14, 0x6b, 0x87, 0x47, 0xe8, 0x79, 0x84, 0x73, 0x3c, 0xbd, 0x92, 0xc9, 0x23, 0x8b, 0x97, 0x95, 0x44, 0xdc, 0xad, 0x40, 0x65, 0x86, 0xa2, 0xa4, 0xcc, 0x7f, 0xec, 0xc0, 0xaf, 0x91, 0xfd, 0xf7, 0x4f, 0x81, 0x2f, 0x5b, 0xea, 0xa8, 0x1c, 0x02, 0xd1, 0x98, 0x71, 0xed, 0x25, 0xe3, 0x24, 0x06, 0x68, 0xb3, 0x93, 0x2c, 0x6f, 0x3e, 0x6c, 0x0a, 0xb8, 0xce, 0xae, 0x74, 0xb1, 0x42, 0xb4, 0x1e, 0xd3, 0x49, 0xe9, 0x9c, 0xc8, 0xc6, 0xc7, 0x22, 0x6e, 0xdb, 0x20, 0xbf, 0x43, 0x51, 0x52, 0x66, 0xb2, 0x76, 0x60, 0xda, 0xc5, 0xf3, 0xf6, 0xaa, 0xcd, 0x9a, 0xa0, 0x75, 0x54, 0x0e, 0x01, ] self.LogIndex: list= [ 0x00, 0xff, 0xc8, 0x08, 0x91, 0x10, 0xd0, 0x36, 0x5a, 0x3e, 0xd8, 0x43, 0x99, 0x77, 0xfe, 0x18, 0x23, 0x20, 0x07, 0x70, 0xa1, 0x6c, 0x0c, 0x7f, 0x62, 0x8b, 0x40, 0x46, 0xc7, 0x4b, 0xe0, 0x0e, 0xeb, 0x16, 0xe8, 0xad, 0xcf, 0xcd, 0x39, 0x53, 0x6a, 0x27, 0x35, 0x93, 0xd4, 0x4e, 0x48, 0xc3, 0x2b, 0x79, 0x54, 0x28, 0x09, 0x78, 0x0f, 0x21, 0x90, 0x87, 0x14, 0x2a, 0xa9, 0x9c, 0xd6, 0x74, 0xb4, 0x7c, 0xde, 0xed, 0xb1, 0x86, 0x76, 0xa4, 0x98, 0xe2, 0x96, 0x8f, 0x02, 0x32, 0x1c, 0xc1, 0x33, 0xee, 0xef, 0x81, 0xfd, 0x30, 0x5c, 0x13, 0x9d, 0x29, 0x17, 0xc4, 0x11, 0x44, 0x8c, 0x80, 0xf3, 0x73, 0x42, 0x1e, 0x1d, 0xb5, 0xf0, 0x12, 0xd1, 0x5b, 0x41, 0xa2, 0xd7, 0x2c, 0xe9, 0xd5, 0x59, 0xcb, 0x50, 0xa8, 0xdc, 0xfc, 0xf2, 0x56, 0x72, 0xa6, 0x65, 0x2f, 0x9f, 0x9b, 0x3d, 0xba, 0x7d, 0xc2, 0x45, 0x82, 0xa7, 0x57, 0xb6, 0xa3, 0x7a, 0x75, 0x4f, 0xae, 0x3f, 0x37, 0x6d, 0x47, 0x61, 0xbe, 0xab, 0xd3, 0x5f, 0xb0, 0x58, 0xaf, 0xca, 0x5e, 0xfa, 0x85, 0xe4, 0x4d, 0x8a, 0x05, 0xfb, 0x60, 0xb7, 0x7b, 0xb8, 0x26, 0x4a, 0x67, 0xc6, 0x1a, 0xf8, 0x69, 0x25, 0xb3, 0xdb, 0xbd, 0x66, 0xdd, 0xf1, 0xd2, 0xdf, 0x03, 0x8d, 0x34, 0xd9, 0x92, 0x0d, 0x63, 0x55, 0xaa, 0x49, 0xec, 0xbc, 0x95, 0x3c, 0x84, 0x0b, 0xf5, 0xe6, 0xe7, 0xe5, 0xac, 0x7e, 0x6e, 0xb9, 0xf9, 0xda, 0x8e, 0x9a, 0xc9, 0x24, 0xe1, 0x0a, 0x15, 0x6b, 0x3a, 0xa0, 0x51, 0xf4, 0xea, 0xb2, 0x97, 0x9e, 0x5d, 0x22, 0x88, 0x94, 0xce, 0x19, 0x01, 0x71, 0x4c, 0xa5, 0xe3, 0xc5, 0x31, 0xbb, 0xcc, 0x1f, 0x2d, 0x3b, 0x52, 0x6f, 0xf6, 0x2e, 0x89, 0xf7, 0xc0, 0x68, 0x1b, 0x64, 0x04, 0x06, 0xbf, 0x83, 0x38, ] def add(self, a: int, b: int) -> int: if not isinstance(a, int) or a < 0 or a > 255: raise ValueError("number a out of uint8 range") if not isinstance(b, int) or b < 0 or b > 255: raise ValueError("number b out of uint8 range") return a ^ b def multiply(self, a: int, b:int) -> int: if not isinstance(a, int) or a < 0 or a > 255: raise ValueError("number a out of uint8 range") if not isinstance(b, int) or b < 0 or b > 255: raise ValueError("number b out of uint8 range") loga: int = self.LogIndex[a] logb: int = self.LogIndex[b] if loga is None or logb is None: raise ValueError("log table value {a} or {b} is missing".format(a=loga, b=logb)) sum_x: int= (loga + logb) % 255 res: int= self.ExpIndex[sum_x] if res is None: raise ValueError("exponential table value {res} is missing".format(res)) if a == 0 or b == 0: return 0 return res def div(self, a: int, b:int) -> int: if not isinstance(a, int) or a < 0 or a > 255: raise ValueError("number a out of uint8 range") if not isinstance(b, int) or b < 0 or b > 255: raise ValueError("number b out of uint8 range") loga: int = self.LogIndex[a] logb: int = self.LogIndex[b] if loga is None or logb is None: raise ValueError("log table value {a} or {b} is missing".format(a=loga, b=logb)) difference: float= (loga - logb + 255) % 255 res: int= self.ExpIndex[difference] if res is None: raise ValueError("exponential table value {res} is missing".format(res)) if a == 0: return 0 if b == 0: raise ValueError("cannot divide by zero") return res def interp_polynomial(self, xsamp: np.ndarray, ysamp: np.ndarray, x: int) -> int: if len(xsamp) != len(ysamp): raise ValueError("sample length mismatch") lim: int= len(xsamp) base: int= 0 res: int= 0 for i in range(lim): base= 1 for j in range(lim): if i == j: continue num: int= self.add(int(x), int(xsamp[j])) denom: int= self.add(int(xsamp[i]), int(xsamp[j])) term: int= self.div(num, denom) base= self.multiply(base, term) res: int= self.add(res, self.multiply(int(ysamp[i]), int(base))) return res def evaluate(self, coefficients: np.ndarray, x: int, degree: int) -> int: if x == 0: raise ValueError("cannot evaluate polynominal with zero") res: int= int(coefficients[degree]) for i in range(degree - 1, -1, -1): coefficient: int= int(coefficients[i]) res: int= self.add(self.multiply(res, int(x)), coefficient) return res def newCoeff(self, interception: int, degree: int) -> np.ndarray: coefficients: np.ndarray= np.zeros(degree + 1, dtype=np.uint8) coefficients[0]= interception randomBytes:int= np.frombuffer(secrets.token_bytes(degree), dtype=np.uint8) coefficients[1:]= randomBytes return coefficients def newCoords(self) -> np.ndarray: coords: np.ndarray= np.zeros(255, dtype=np.uint8) for i in range(255): coords[i]= i + 1 rindices: bytes= secrets.token_bytes(255) for i in range(255): j: int= rindices[i] % 255 temp: int= coords[i] coords[i]= coords[j] coords[j]= temp coords.flags.writeable= False return coords async def combine_Shares(self, shares: np.ndarray) -> str: if len(shares) == 0: raise ValueError("No shares provided") share1= shares[0] print(share1) if not isinstance(share1, (bytes, np.ndarray)): raise TypeError("Each share must be bytes or np.ndarray") share1_len = len(share1) if share1_len < 2: raise ValueError("Each share must be at least 2 bytes") for share in shares: if not isinstance(share, (bytes, np.ndarray)): raise TypeError("Each share must be a bytes or np.ndarray") if len(share) < 2: raise ValueError("Each share must be at least 2 bytes") if len(share) != share1_len: raise ValueError("All shares must have the same byte length") shares_len: int= len(shares) #share1_bytelen: int= len(share1) # share1.encode("utf-8") share1_bytelen: int= share1.size secret_len: int= share1_bytelen - 1 secret: np.ndarray= np.zeros(secret_len, dtype=np.uint8) xsamps: np.ndarray= np.zeros(shares_len, dtype=np.uint8) ysamps: np.ndarray= np.zeros(shares_len, dtype=np.uint8) samples: set= set() for i in range(shares_len): share: str= shares[i] sample: str= share[share1_bytelen - 1] if sample in samples: raise ValueError("shares must contain unique values but a duplicate was found") samples.add(sample) xsamps[i]= sample #xsamps[i] = shares[i][-1] for i in range(secret_len): for j in range(shares_len): ysamps[j]= shares[j][i] secret[i]= self.interp_polynomial(xsamps, ysamps, 0) return secret #return bytes(secret).decode('utf-8') async def split_iShares(self, secret: np.ndarray, shares: int, threshold: int) -> np.ndarray: if len(secret.encode("utf-8")) <= 1: raise ValueError("secret cannot be empty") elif shares > 255 or shares < 2: raise ValueError("shares must be at least 2 and at most 255") elif threshold > 255 or threshold < 2: raise ValueError("threshold must be at least 2 and at most 255") elif shares < threshold: raise ValueError("shares cannot be smaller than threshold") secret_bytes: int= secret.encode("utf-8") secret_length: int= len(secret_bytes) result: np.ndarray = np.zeros((shares, secret_length + 1), dtype=np.uint8) xcoords: np.ndarray= self.newCoords() for i in range(shares): result[i, :] = np.zeros(secret_length + 1, dtype=np.uint8) result[i, -1] = xcoords[i] degree: int= threshold - 1 for i in range(secret_length): byte: int= secret_bytes[i] coefficients: np.ndarray= self.newCoeff(byte, degree) for j in range(shares): x: int= xcoords[j] y: int= self.evaluate(coefficients, x, degree) result[j][i]= y return result @staticmethod def c_string(py_str: str): data= py_str.encode("utf-8") + b"\x00" buf= malloc(len(data)) ctypes.memmove(buf, data, len(data)) return ctypes.c_char_p(buf) @staticmethod @ctypes.CFUNCTYPE(ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p), ctypes.c_int) def combine_wrapper(c_array, length): shares= [c_array[i].decode("utf-8") for i in range(length)] arr= np.array(shares, dtype=object) result= as_run(SharmirsShares.combine_Shares(arr)) return SharmirsShares.c_string(result) @staticmethod @ctypes.CFUNCTYPE(ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p), ctypes.c_int, ctypes.c_int, ctypes.c_int) def split_wrapper(c_array, length, shares_num, threshold): secret= [c_array[i].decode("utf-8") for i in range(length)] arr= np.array(secret, dtype=object) result= as_run(SharmirsShares.split_iShares(arr, shares_num, threshold)) joined= "|".join(result.tolist()) return SharmirsShares.c_string(joined) exports = { "combine_wrapper": SharmirsShares.combine_wrapper, "split_wrapper": SharmirsShares.split_wrapper } """ split_shares= asyncio.run(SharmirsShares().split_iShares("test", 3, 3)) print('Split shares: \'{}\''.format(split_shares)) shares = np.array([ [159, 88, 241, 80, 66], [48, 36, 234, 104, 47], [148, 88, 129, 27, 121] ]) combined_shares= asyncio.run(SharmirsShares().combine_Shares(shares)) print('Combined shares: \'{}\''.format(bytes(combined_shares).decode('utf-8'))) """