7 from Crypto.Hash
import SHA512
8 from typing
import Optional
11 READ_CHUNK_SIZE = 4096
14 def _read_part_headers(stream: io.IOBase) -> int:
17 line = stream.readline()
18 bytes_read = len(line)
19 total_bytes = total_bytes + bytes_read
24 if l_str
in [
"\r\n",
"\n"]:
27 raise Exception(
"unexpected 0-length line")
33 stream: io.BufferedIOBase,
35 bytes_to_read: int) -> Optional[bytes]:
37 Stream bytes to a file, while computing the digest. Return the digest or
38 None if there is an error.
42 with open(file_name,
"wb")
as out_f:
43 while bytes_to_read > 0:
44 read_size = min(READ_CHUNK_SIZE, bytes_to_read)
45 chunk = stream.read(read_size)
51 bytes_to_read = bytes_to_read - len(chunk)
58 stream: io.BufferedIOBase,
59 bytes_to_read: int) -> Optional[bytes]:
61 while bytes_to_read > 0:
62 chunk = stream.read(bytes_to_read)
67 bytes_to_read = bytes_to_read - len(chunk)
69 return data.getvalue()
74 content_boundary: str,
76 stream: io.BufferedIOBase,
77 file_name: str) ->
None:
79 Given sufficient header data and an input stream, stream raw content to a
80 file, hashing it at the same time to verify the given signature.
83 final_boundary = f
"\r\n--{content_boundary}--\r\n"
84 final_boundary_size = len(final_boundary)
96 remaining_bytes = content_length - final_boundary_size
99 header_bytes = _read_part_headers(stream)
100 remaining_bytes = remaining_bytes - header_bytes
104 digest = _read_to_file(stream, file_name, remaining_bytes)
106 raise Exception(
"invalid part format")
107 if digest != expect_digest:
108 raise Exception(
"digest mismatch")
111 tail = _read_to_memory(stream, final_boundary_size)
112 if tail
is None or tail.decode() != final_boundary:
113 raise Exception(
"invalid part tail")