9 from __future__
import annotations
10 from .
import constants
16 from os.path
import join, dirname, normpath, exists
19 from web3
import Web3, HTTPProvider
20 from typing
import Sequence, List, Tuple, Union, Iterable, Any, Optional, cast
25 WEB3_HTTP_PROVIDER_TIMEOUT_SEC = 60
30 certificate: Optional[str] =
None,
31 insecure: bool =
False) -> Any:
33 Create a Web3 context from an http URL.
35 if certificate
and not exists(certificate):
36 raise FileNotFoundError(f
"certificate file not found: {certificate}")
37 assert not certificate
or exists(certificate)
38 request_verify: Union[str, bool,
None] =
False if insecure
else certificate
40 'timeout': WEB3_HTTP_PROVIDER_TIMEOUT_SEC,
41 'verify': request_verify,
43 return Web3(HTTPProvider(url, request_kwargs=request_kwargs))
48 Representation of some amount of Ether (or any token) in terms of Wei.
49 Disambiguates Ether values from other units such as zeth_units.
51 def __init__(self, val: Union[str, int, float], units: str =
'ether'):
52 self.
wei = Web3.toWei(val, units)
57 def __add__(self, other: EtherValue) -> EtherValue:
60 def __sub__(self, other: EtherValue) -> EtherValue:
63 def __eq__(self, other: Any) -> bool:
64 if not isinstance(other, EtherValue):
66 return self.
wei == other.wei
68 def __ne__(self, other: Any) -> bool:
69 return not self.
__eq__(other)
71 def __lt__(self, other: EtherValue) -> bool:
72 return self.
wei < other.wei
74 def __le__(self, other: EtherValue) -> bool:
75 return self.
wei <= other.wei
77 def __gt__(self, other: EtherValue) -> bool:
78 return self.
wei > other.wei
80 def __ge__(self, other: EtherValue) -> bool:
81 return self.
wei >= other.wei
87 return str(Web3.fromWei(self.
wei,
'ether'))
92 Typed wrapper around eth_abi.encode_single
94 return eth_abi.encode_single(type_name, data)
97 def encode_abi(type_names: List[str], data: List[Any]) -> bytes:
99 Typed wrapper around eth_abi.encode_abi
101 return eth_abi.encode_abi(type_names, data)
106 Binary encoding of ethereum address to 20 bytes
109 assert len(eth_addr) == 42
110 assert eth_addr.startswith(
"0x")
111 return bytes.fromhex(eth_addr[2:])
116 Binary encoding of ethereum address to 32 bytes
122 assert isinstance(eth_uint256, str)
123 assert eth_uint256.startswith(
"0x")
124 return int.from_bytes(
130 pk = eth_keys.keys.PrivateKey(eth_private_key)
131 return pk.public_key.to_address()
136 Decode prefixed / non-prefixed hex string and extract the length in bytes
137 as well as the value.
139 assert len(value_hex) % 2 == 0
140 if value_hex.startswith(
"0x"):
141 num_bytes =
int((len(value_hex) - 2) / 2)
143 num_bytes =
int(len(value_hex) / 2)
144 return (
int(value_hex, 16), num_bytes)
149 Create prefixed hex string enforcing a specific byte-length.
151 return "0x" + value.to_bytes(num_bytes, byteorder=
'big').hex()
155 return number.to_bytes(8,
'big')
163 if len(digest) % 2 == 1:
164 digest =
"0" + digest
165 return "".join([
"{0:04b}".format(
int(c, 16))
for c
in digest])
169 return "".join([
"{0:08b}".format(b)
for b
in digest])
174 Given a hex string of arbitrary size, split into uint256 ints, left padding
177 if hex_str.startswith(
"0x"):
178 hex_str = hex_str[2:]
179 assert len(hex_str) % 2 == 0
181 next_idx = len(hex_str) -
int((len(hex_str) - 1) / 64) * 64
182 while next_idx <= len(hex_str):
183 sub_str = hex_str[start_idx:next_idx]
184 yield int(sub_str, 16)
186 next_idx = next_idx + 64
190 elements: Sequence[Union[str, List[str]]]) -> List[int]:
192 Given an array of hex strings, return an array of int values by converting
193 each hex string to evm uint256 words, and flattening the final list.
204 Pad value on the left with zeros, to make 32 bytes.
206 assert len(value) <= 32
207 return bytes(32-len(value)) + value
212 Extend a hex string to represent 32 bytes
215 if len(res) % 2 != 0:
222 Convert a quantity of ether / token to Zeth units
224 return int(value.wei / constants.ZETH_PUBLIC_UNIT_VALUE)
229 Convert a quantity of ether / token to Zeth units
231 return EtherValue(zeth_units * constants.ZETH_PUBLIC_UNIT_VALUE,
"wei")
236 Parse the zksnark argument and return its value
238 parser = argparse.ArgumentParser(
239 description=
"Testing Zeth transactions using the specified zkSNARK " +
240 "('GROTH16' or 'PGHR13').\nNote that the zkSNARK must match the one " +
241 "used on the prover server.")
242 parser.add_argument(
"zksnark", help=
"Set the zkSNARK to use")
243 args = parser.parse_args()
244 if args.zksnark
not in constants.VALID_ZKSNARKS:
245 return sys.exit(errors.SNARK_NOT_SUPPORTED)
250 return os.environ.get(
252 normpath(join(dirname(__file__),
"..",
"..",
"..")))
256 return os.environ.get(
257 'ZETH_CONTRACTS_DIR',
263 Flatten a list containing strings or lists of strings.
265 if any(isinstance(el, (list, tuple))
for el
in str_list):
268 if isinstance(el, (list, tuple)):
271 strs.append(cast(str, el))
274 return cast(List[str], str_list)
280 Encode a list of variables, or list of lists of variables into a byte
286 data_bytes = bytearray()
292 if isinstance(m, int):
293 m_hex =
"{0:0>4X}".format(m)
294 elif isinstance(m, str)
and (m[1] ==
"x"):
308 Summary of the commitment value, in some standard format.