Files
cryptography/crypto/shamirs_secret_sharing/shamirs_secret_sharing.py

331 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ported from https://github.com/privy-io/shamir-secret-sharing/blob/main/src/index.ts
i barely dealt with actual hands-on cryptography like this,
neither do i understand the math behind it so this is
probably not the best implementation, but it works quite
well for my private services
"""
import numpy as np
import secrets
import ctypes
import asyncio
import platform
__name__= "Shamir´s Secret Sharing by serpent192 / Privy Services (port)"
if platform.system() == "Windows":
libc = ctypes.CDLL("msvcrt.dll")
else:
libc = ctypes.CDLL(None)
malloc= libc.malloc
malloc.restype= ctypes.c_void_p
malloc.argtypes= [ctypes.c_size_t]
_loop = None
def as_run(coro):
global _loop
if _loop is None:
_loop= asyncio.new_event_loop()
asyncio.set_event_loop(_loop)
return _loop.run_until_complete(coro)
class SharmirsShares:
def __init__(self):
self.ExpIndex: list= [
0x01, 0xe5, 0x4c, 0xb5, 0xfb, 0x9f, 0xfc, 0x12, 0x03, 0x34, 0xd4, 0xc4, 0x16, 0xba, 0x1f, 0x36,
0x05, 0x5c, 0x67, 0x57, 0x3a, 0xd5, 0x21, 0x5a, 0x0f, 0xe4, 0xa9, 0xf9, 0x4e, 0x64, 0x63, 0xee,
0x11, 0x37, 0xe0, 0x10, 0xd2, 0xac, 0xa5, 0x29, 0x33, 0x59, 0x3b, 0x30, 0x6d, 0xef, 0xf4, 0x7b,
0x55, 0xeb, 0x4d, 0x50, 0xb7, 0x2a, 0x07, 0x8d, 0xff, 0x26, 0xd7, 0xf0, 0xc2, 0x7e, 0x09, 0x8c,
0x1a, 0x6a, 0x62, 0x0b, 0x5d, 0x82, 0x1b, 0x8f, 0x2e, 0xbe, 0xa6, 0x1d, 0xe7, 0x9d, 0x2d, 0x8a,
0x72, 0xd9, 0xf1, 0x27, 0x32, 0xbc, 0x77, 0x85, 0x96, 0x70, 0x08, 0x69, 0x56, 0xdf, 0x99, 0x94,
0xa1, 0x90, 0x18, 0xbb, 0xfa, 0x7a, 0xb0, 0xa7, 0xf8, 0xab, 0x28, 0xd6, 0x15, 0x8e, 0xcb, 0xf2,
0x13, 0xe6, 0x78, 0x61, 0x3f, 0x89, 0x46, 0x0d, 0x35, 0x31, 0x88, 0xa3, 0x41, 0x80, 0xca, 0x17,
0x5f, 0x53, 0x83, 0xfe, 0xc3, 0x9b, 0x45, 0x39, 0xe1, 0xf5, 0x9e, 0x19, 0x5e, 0xb6, 0xcf, 0x4b,
0x38, 0x04, 0xb9, 0x2b, 0xe2, 0xc1, 0x4a, 0xdd, 0x48, 0x0c, 0xd0, 0x7d, 0x3d, 0x58, 0xde, 0x7c,
0xd8, 0x14, 0x6b, 0x87, 0x47, 0xe8, 0x79, 0x84, 0x73, 0x3c, 0xbd, 0x92, 0xc9, 0x23, 0x8b, 0x97,
0x95, 0x44, 0xdc, 0xad, 0x40, 0x65, 0x86, 0xa2, 0xa4, 0xcc, 0x7f, 0xec, 0xc0, 0xaf, 0x91, 0xfd,
0xf7, 0x4f, 0x81, 0x2f, 0x5b, 0xea, 0xa8, 0x1c, 0x02, 0xd1, 0x98, 0x71, 0xed, 0x25, 0xe3, 0x24,
0x06, 0x68, 0xb3, 0x93, 0x2c, 0x6f, 0x3e, 0x6c, 0x0a, 0xb8, 0xce, 0xae, 0x74, 0xb1, 0x42, 0xb4,
0x1e, 0xd3, 0x49, 0xe9, 0x9c, 0xc8, 0xc6, 0xc7, 0x22, 0x6e, 0xdb, 0x20, 0xbf, 0x43, 0x51, 0x52,
0x66, 0xb2, 0x76, 0x60, 0xda, 0xc5, 0xf3, 0xf6, 0xaa, 0xcd, 0x9a, 0xa0, 0x75, 0x54, 0x0e, 0x01,
]
self.LogIndex: list= [
0x00, 0xff, 0xc8, 0x08, 0x91, 0x10, 0xd0, 0x36, 0x5a, 0x3e, 0xd8, 0x43, 0x99, 0x77, 0xfe, 0x18,
0x23, 0x20, 0x07, 0x70, 0xa1, 0x6c, 0x0c, 0x7f, 0x62, 0x8b, 0x40, 0x46, 0xc7, 0x4b, 0xe0, 0x0e,
0xeb, 0x16, 0xe8, 0xad, 0xcf, 0xcd, 0x39, 0x53, 0x6a, 0x27, 0x35, 0x93, 0xd4, 0x4e, 0x48, 0xc3,
0x2b, 0x79, 0x54, 0x28, 0x09, 0x78, 0x0f, 0x21, 0x90, 0x87, 0x14, 0x2a, 0xa9, 0x9c, 0xd6, 0x74,
0xb4, 0x7c, 0xde, 0xed, 0xb1, 0x86, 0x76, 0xa4, 0x98, 0xe2, 0x96, 0x8f, 0x02, 0x32, 0x1c, 0xc1,
0x33, 0xee, 0xef, 0x81, 0xfd, 0x30, 0x5c, 0x13, 0x9d, 0x29, 0x17, 0xc4, 0x11, 0x44, 0x8c, 0x80,
0xf3, 0x73, 0x42, 0x1e, 0x1d, 0xb5, 0xf0, 0x12, 0xd1, 0x5b, 0x41, 0xa2, 0xd7, 0x2c, 0xe9, 0xd5,
0x59, 0xcb, 0x50, 0xa8, 0xdc, 0xfc, 0xf2, 0x56, 0x72, 0xa6, 0x65, 0x2f, 0x9f, 0x9b, 0x3d, 0xba,
0x7d, 0xc2, 0x45, 0x82, 0xa7, 0x57, 0xb6, 0xa3, 0x7a, 0x75, 0x4f, 0xae, 0x3f, 0x37, 0x6d, 0x47,
0x61, 0xbe, 0xab, 0xd3, 0x5f, 0xb0, 0x58, 0xaf, 0xca, 0x5e, 0xfa, 0x85, 0xe4, 0x4d, 0x8a, 0x05,
0xfb, 0x60, 0xb7, 0x7b, 0xb8, 0x26, 0x4a, 0x67, 0xc6, 0x1a, 0xf8, 0x69, 0x25, 0xb3, 0xdb, 0xbd,
0x66, 0xdd, 0xf1, 0xd2, 0xdf, 0x03, 0x8d, 0x34, 0xd9, 0x92, 0x0d, 0x63, 0x55, 0xaa, 0x49, 0xec,
0xbc, 0x95, 0x3c, 0x84, 0x0b, 0xf5, 0xe6, 0xe7, 0xe5, 0xac, 0x7e, 0x6e, 0xb9, 0xf9, 0xda, 0x8e,
0x9a, 0xc9, 0x24, 0xe1, 0x0a, 0x15, 0x6b, 0x3a, 0xa0, 0x51, 0xf4, 0xea, 0xb2, 0x97, 0x9e, 0x5d,
0x22, 0x88, 0x94, 0xce, 0x19, 0x01, 0x71, 0x4c, 0xa5, 0xe3, 0xc5, 0x31, 0xbb, 0xcc, 0x1f, 0x2d,
0x3b, 0x52, 0x6f, 0xf6, 0x2e, 0x89, 0xf7, 0xc0, 0x68, 0x1b, 0x64, 0x04, 0x06, 0xbf, 0x83, 0x38,
]
def add(self, a: int, b: int) -> int:
if not isinstance(a, int) or a < 0 or a > 255:
raise ValueError("number a out of uint8 range")
if not isinstance(b, int) or b < 0 or b > 255:
raise ValueError("number b out of uint8 range")
return a ^ b
def multiply(self, a: int, b:int) -> int:
if not isinstance(a, int) or a < 0 or a > 255:
raise ValueError("number a out of uint8 range")
if not isinstance(b, int) or b < 0 or b > 255:
raise ValueError("number b out of uint8 range")
loga: int = self.LogIndex[a]
logb: int = self.LogIndex[b]
if loga is None or logb is None:
raise ValueError("log table value {a} or {b} is missing".format(a=loga, b=logb))
sum_x: int= (loga + logb) % 255
res: int= self.ExpIndex[sum_x]
if res is None:
raise ValueError("exponential table value {res} is missing".format(res))
if a == 0 or b == 0:
return 0
return res
def div(self, a: int, b:int) -> int:
if not isinstance(a, int) or a < 0 or a > 255:
raise ValueError("number a out of uint8 range")
if not isinstance(b, int) or b < 0 or b > 255:
raise ValueError("number b out of uint8 range")
loga: int = self.LogIndex[a]
logb: int = self.LogIndex[b]
if loga is None or logb is None:
raise ValueError("log table value {a} or {b} is missing".format(a=loga, b=logb))
difference: float= (loga - logb + 255) % 255
res: int= self.ExpIndex[difference]
if res is None:
raise ValueError("exponential table value {res} is missing".format(res))
if a == 0:
return 0
if b == 0:
raise ValueError("cannot divide by zero")
return res
def interp_polynomial(self, xsamp: np.ndarray, ysamp: np.ndarray, x: int) -> int:
if len(xsamp) != len(ysamp):
raise ValueError("sample length mismatch")
lim: int= len(xsamp)
base: int= 0
res: int= 0
for i in range(lim):
base= 1
for j in range(lim):
if i == j:
continue
num: int= self.add(int(x), int(xsamp[j]))
denom: int= self.add(int(xsamp[i]), int(xsamp[j]))
term: int= self.div(num, denom)
base= self.multiply(base, term)
res: int= self.add(res, self.multiply(int(ysamp[i]), int(base)))
return res
def evaluate(self, coefficients: np.ndarray, x: int, degree: int) -> int:
if x == 0:
raise ValueError("cannot evaluate polynominal with zero")
res: int= int(coefficients[degree])
for i in range(degree - 1, -1, -1):
coefficient: int= int(coefficients[i])
res: int= self.add(self.multiply(res, int(x)), coefficient)
return res
def newCoeff(self, interception: int, degree: int) -> np.ndarray:
coefficients: np.ndarray= np.zeros(degree + 1, dtype=np.uint8)
coefficients[0]= interception
randomBytes:int= np.frombuffer(secrets.token_bytes(degree), dtype=np.uint8)
coefficients[1:]= randomBytes
return coefficients
def newCoords(self) -> np.ndarray:
coords: np.ndarray= np.zeros(255, dtype=np.uint8)
for i in range(255):
coords[i]= i + 1
rindices: bytes= secrets.token_bytes(255)
for i in range(255):
j: int= rindices[i] % 255
temp: int= coords[i]
coords[i]= coords[j]
coords[j]= temp
coords.flags.writeable= False
return coords
async def combine_Shares(self, shares: np.ndarray) -> str:
if len(shares) == 0:
raise ValueError("No shares provided")
share1= shares[0]
print(share1)
if not isinstance(share1, (bytes, np.ndarray)):
raise TypeError("Each share must be bytes or np.ndarray")
share1_len = len(share1)
if share1_len < 2:
raise ValueError("Each share must be at least 2 bytes")
for share in shares:
if not isinstance(share, (bytes, np.ndarray)):
raise TypeError("Each share must be a bytes or np.ndarray")
if len(share) < 2:
raise ValueError("Each share must be at least 2 bytes")
if len(share) != share1_len:
raise ValueError("All shares must have the same byte length")
shares_len: int= len(shares)
#share1_bytelen: int= len(share1) # share1.encode("utf-8")
share1_bytelen: int= share1.size
secret_len: int= share1_bytelen - 1
secret: np.ndarray= np.zeros(secret_len, dtype=np.uint8)
xsamps: np.ndarray= np.zeros(shares_len, dtype=np.uint8)
ysamps: np.ndarray= np.zeros(shares_len, dtype=np.uint8)
samples: set= set()
for i in range(shares_len):
share: str= shares[i]
sample: str= share[share1_bytelen - 1]
if sample in samples:
raise ValueError("shares must contain unique values but a duplicate was found")
samples.add(sample)
xsamps[i]= sample
#xsamps[i] = shares[i][-1]
for i in range(secret_len):
for j in range(shares_len):
ysamps[j]= shares[j][i]
secret[i]= self.interp_polynomial(xsamps, ysamps, 0)
return secret
#return bytes(secret).decode('utf-8')
async def split_iShares(self, secret: np.ndarray, shares: int, threshold: int) -> np.ndarray:
if len(secret.encode("utf-8")) <= 1:
raise ValueError("secret cannot be empty")
elif shares > 255 or shares < 2:
raise ValueError("shares must be at least 2 and at most 255")
elif threshold > 255 or threshold < 2:
raise ValueError("threshold must be at least 2 and at most 255")
elif shares < threshold:
raise ValueError("shares cannot be smaller than threshold")
secret_bytes: int= secret.encode("utf-8")
secret_length: int= len(secret_bytes)
result: np.ndarray = np.zeros((shares, secret_length + 1), dtype=np.uint8)
xcoords: np.ndarray= self.newCoords()
for i in range(shares):
result[i, :] = np.zeros(secret_length + 1, dtype=np.uint8)
result[i, -1] = xcoords[i]
degree: int= threshold - 1
for i in range(secret_length):
byte: int= secret_bytes[i]
coefficients: np.ndarray= self.newCoeff(byte, degree)
for j in range(shares):
x: int= xcoords[j]
y: int= self.evaluate(coefficients, x, degree)
result[j][i]= y
return result
@staticmethod
def c_string(py_str: str):
data= py_str.encode("utf-8") + b"\x00"
buf= malloc(len(data))
ctypes.memmove(buf, data, len(data))
return ctypes.c_char_p(buf)
@staticmethod
@ctypes.CFUNCTYPE(ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p), ctypes.c_int)
def combine_wrapper(c_array, length):
shares= [c_array[i].decode("utf-8") for i in range(length)]
arr= np.array(shares, dtype=object)
result= as_run(SharmirsShares.combine_Shares(arr))
return SharmirsShares.c_string(result)
@staticmethod
@ctypes.CFUNCTYPE(ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p), ctypes.c_int, ctypes.c_int, ctypes.c_int)
def split_wrapper(c_array, length, shares_num, threshold):
secret= [c_array[i].decode("utf-8") for i in range(length)]
arr= np.array(secret, dtype=object)
result= as_run(SharmirsShares.split_iShares(arr, shares_num, threshold))
joined= "|".join(result.tolist())
return SharmirsShares.c_string(joined)
exports = {
"combine_wrapper": SharmirsShares.combine_wrapper,
"split_wrapper": SharmirsShares.split_wrapper
}
"""
split_shares= asyncio.run(SharmirsShares().split_iShares("test", 3, 3))
print('Split shares: \'{}\''.format(split_shares))
shares = np.array([
[159, 88, 241, 80, 66],
[48, 36, 234, 104, 47],
[148, 88, 129, 27, 121]
])
combined_shares= asyncio.run(SharmirsShares().combine_Shares(shares))
print('Combined shares: \'{}\''.format(bytes(combined_shares).decode('utf-8')))
"""