11 from cheroot.wsgi
import Server
as WSGIServer, PathInfoDispatcher
12 from cheroot.ssl.builtin
import BuiltinSSLAdapter
13 from .icontributionhandler
import IContributionHandler
14 from .interval
import Interval
15 from .server_configuration
import Configuration
16 from .server_state
import ServerState, initial_server_state
17 from .contributor_list
import ContributorList
19 from .upload_utils
import handle_upload_request
21 import_digest, export_verification_key, import_signature, verify
22 from typing
import cast, Optional, Callable, Iterable
23 from flask
import Flask, request, Request, Response, stream_with_context
24 from threading
import Thread, Lock
28 from logging
import info, warning, error
30 from os.path
import exists, join
32 STATE_FILE =
"server_state.json"
33 UPLOAD_FILE =
"upload.raw"
34 LOG_FILE =
"server.log"
35 SEND_CHUNK_SIZE = 4096
40 Server to coordinate an MPC, that serves challenges and accepts responses
41 from contributors. Performs basic contribution management, ensuring
42 contributions are from the correct party and submitted within the correct
43 time window. MPC-specific operations (validation / challenge computation,
44 etc) are performed by an IContributionHandler object.
49 handler: IContributionHandler,
50 server_config: Configuration,
53 logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG)
55 self.
config = server_config
59 self.
state: ServerState
63 with open(self.
config.contributors_file,
"r")
as contributors_f:
64 self.
contributors = ContributorList.from_json(contributors_f.read())
65 print(f
"Contributors: {self.contributors}")
68 if exists(STATE_FILE):
69 info(f
"run_server: using existing state file: {STATE_FILE}")
70 with open(STATE_FILE,
"r")
as state_f:
71 self.
state = ServerState.from_json(state_f.read())
79 self.
server: Optional[WSGIServer] =
None
84 info(
"Waiting for MPC server to start ...")
86 port = self.
server.socket.getsockname()[1]
87 info(f
"MPC server started (port: {port}).")
90 if self.
server is not None:
92 while self.
server is not None:
93 info(
"Waiting for server to stop ...")
96 info(
"Server stopped.")
98 def _write_state_file(self) -> None:
99 info(f
"Writing state: {self.state_file_path}")
101 state_f.write(self.
state.to_json())
103 def _finalize_handler_once(self) -> None:
108 def _update_state(self, now: float) ->
None:
109 if self.
state.update(now, self.
config.contribution_interval):
112 def _on_contribution(self, now: float) ->
None:
113 next_deadline = now + self.
config.contribution_interval
114 self.
state.received_contribution(next_deadline)
117 def _on_next_contributor(self) -> None:
122 def _notify_next_contributor(self) -> None:
123 if self.
state.have_all_contributions()
or not self.
config.email_server:
126 contributor_idx = self.
state.next_contributor_index
127 idx_readable = contributor_idx + 1
128 total = self.
state.num_contributors
132 email_server=self.
config.email_server,
133 email_address=cast(str, self.
config.email_address),
134 email_password_file=cast(str, self.
config.email_password_file),
135 to_addr=contributor.email,
136 subject=f
"[MPC] Your timeslot has begun ({idx_readable}/{total})",
137 body=
"Please contribute to the MPC using your key: " +
139 except Exception
as ex:
140 print(f
"Failed to notify: {contributor.email}: {ex}")
141 error(f
"Failed to notify: {contributor.email}: {ex}")
143 def _tick(self) -> None:
145 info(
"_tick: processing. Ignoring tick")
151 def _contributors(self, _req: Request) -> Response:
154 def _state(self, _req: Request) -> Response:
155 return Response(self.
state.to_json(), 200)
157 def _challenge(self, _req: Request) -> Response:
160 if self.
state.have_all_contributions():
162 "MPC is complete. No remaining challenges", 405)
167 def produce_file_chunks(path: str) -> Iterable[bytes]:
168 with open(path,
'rb')
as in_f:
170 buf = in_f.read(SEND_CHUNK_SIZE)
176 challenge_file = self.
handler.get_current_challenge_file(
177 self.
state.next_contributor_index)
179 stream_with_context(produce_file_chunks(challenge_file)),
180 mimetype=
"application/octet-stream")
182 def _contribute(self, req: Request) -> Response:
184 headers = req.headers
185 if 'Content-Length' not in headers:
186 raise Exception(
"no Content-Length header")
187 if 'Content-Type' not in headers:
188 raise Exception(
"no Content-Type header")
189 if 'X-MPC-Digest' not in headers:
190 raise Exception(
"no X-MPC-Digest header")
191 if 'X-MPC-Public-Key' not in headers:
192 raise Exception(
"no X-MPC-Public-Key header")
193 if 'X-MPC-Signature' not in headers:
194 raise Exception(
"no X-MPC-Signature header")
196 content_length =
int(headers[
'Content-Length'])
197 content_type = headers[
'Content-Type']
199 pub_key_str = headers.get(
'X-MPC-Public-Key')
203 for val
in content_type.split(
"; "):
204 if val.startswith(
"boundary="):
205 boundary = val[len(
"boundary="):]
208 raise Exception(
"content-type contains no boundary")
211 info(f
"Contribute: current time = {now}")
216 if self.
state.have_all_contributions():
217 return Response(
"MPC complete. No contributions accepted.", 405)
222 contributor_idx = self.
state.next_contributor_index
224 verification_key = contributor.verification_key
226 if expect_pub_key_str != pub_key_str:
228 f
"Contributor key mismatch (contributor {contributor_idx})",
236 if not verify(sig, verification_key, digest):
238 f
"Signature check failed (contributor {contributor_idx})",
249 cast(io.BufferedIOBase, request.stream),
257 info(f
"Launched thread for {self.state.next_contributor_index}" +
258 f
"/{self.state.num_contributors} contrib")
259 return Response(
"OK", 200)
261 def _process_contribution(self) -> None:
264 "_process_contribution(thread): processing contribution " +
265 f
"{self.state.next_contributor_index}" +
266 f
"/{self.state.num_contributors} (start={time.time()})")
268 if self.
handler.process_contribution(
269 self.
state.next_contributor_index,
272 info(f
"_process_contribution(thread): SUCCESS (finished {now})")
276 warning(
"_process_contribution(thread): contribution failed")
287 info(
"_process_contribution(thread): completed")
289 def _run(self) -> None:
291 app = Flask(__name__)
293 def _with_state_lock(
295 callback: Callable[[Request], Response]) -> Response:
298 return Response(
"Processing contribution. Retry later.", 503)
303 except Exception
as ex:
304 warning(f
"error in request: {ex}")
305 print(f
"error in request: {ex}")
306 return Response(
"error: {ex}", 400)
308 @app.route(
'/contributors', methods=[
'GET'])
312 @app.route(
'/state', methods=[
'GET'])
313 def state() -> Response:
314 return _with_state_lock(request, self.
_state)
316 @app.route(
'/challenge', methods=[
'GET'])
317 def challenge() -> Response:
318 return _with_state_lock(request, self.
_challenge)
320 @app.route(
'/contribute', methods=[
'POST'])
330 if not exists(self.
config.tls_certificate):
331 raise Exception(f
"no cert file {self.config.tls_certificate}")
332 if not exists(self.
config.tls_key):
333 raise Exception(f
"no key file {self.config.tls_key}")
335 listen_addr = (
'0.0.0.0', self.
config.port)
338 PathInfoDispatcher({
'/': app}),
340 self.
server.ssl_adapter = BuiltinSSLAdapter(
341 self.
config.tls_certificate,
343 print(f
"Listening on {listen_addr} ...")
353 email_password_file: str,
358 Send an email, given a server + credentials
360 from ssl
import create_default_context
361 from smtplib
import SMTP_SSL
362 from email.message
import EmailMessage
364 host_port = email_server.split(
":")
366 if len(host_port) == 2:
367 port =
int(host_port[1])
371 ssl_ctx = create_default_context()
372 with SMTP_SSL(host, port, context=ssl_ctx)
as smtp:
373 with open(email_password_file,
"r")
as passwd_f:
374 password = passwd_f.readline().rstrip()
375 smtp.login(email_address, password)
377 msg.set_content(f
"Subject: {subject}\n\n{body}")
378 msg[
'Subject'] = subject
379 msg[
'From'] = email_address
381 smtp.send_message(msg)