Zeth - Zerocash on Ethereum  0.8
Reference implementation of the Zeth protocol by Clearmatics
server.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # Copyright (c) 2015-2022 Clearmatics Technologies Ltd
4 #
5 # SPDX-License-Identifier: LGPL-3.0+
6 
7 """
8 server command
9 """
10 
11 from cheroot.wsgi import Server as WSGIServer, PathInfoDispatcher # type: ignore
12 from cheroot.ssl.builtin import BuiltinSSLAdapter # type: ignore
13 from .icontributionhandler import IContributionHandler
14 from .interval import Interval
15 from .server_configuration import Configuration
16 from .server_state import ServerState, initial_server_state
17 from .contributor_list import ContributorList
18 
19 from .upload_utils import handle_upload_request
20 from .crypto import \
21  import_digest, export_verification_key, import_signature, verify
22 from typing import cast, Optional, Callable, Iterable
23 from flask import Flask, request, Request, Response, stream_with_context
24 from threading import Thread, Lock
25 import io
26 import time
27 import logging
28 from logging import info, warning, error
29 from os import remove
30 from os.path import exists, join
31 
32 STATE_FILE = "server_state.json"
33 UPLOAD_FILE = "upload.raw"
34 LOG_FILE = "server.log"
35 SEND_CHUNK_SIZE = 4096
36 
37 
38 class Server:
39  """
40  Server to coordinate an MPC, that serves challenges and accepts responses
41  from contributors. Performs basic contribution management, ensuring
42  contributions are from the correct party and submitted within the correct
43  time window. MPC-specific operations (validation / challenge computation,
44  etc) are performed by an IContributionHandler object.
45  """
46 
47  def __init__(
48  self,
49  handler: IContributionHandler,
50  server_config: Configuration,
51  server_dir: str):
52 
53  logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG)
54  self.handler = handler
55  self.config = server_config
56  self.contributors: ContributorList
57  self.upload_file = join(server_dir, UPLOAD_FILE)
58  self.state_file_path = join(server_dir, STATE_FILE)
59  self.state: ServerState
60  self.processing = False
61 
62  # Try to open contributors file and state files. Perform sanity checks.
63  with open(self.config.contributors_file, "r") as contributors_f:
64  self.contributors = ContributorList.from_json(contributors_f.read())
65  print(f"Contributors: {self.contributors}")
66  self.contributors.ensure_validity()
67 
68  if exists(STATE_FILE):
69  info(f"run_server: using existing state file: {STATE_FILE}")
70  with open(STATE_FILE, "r") as state_f:
71  self.state = ServerState.from_json(state_f.read())
72  else:
76 
77  self.handler_finalized = self.state.have_all_contributions()
78  self.state_lock = Lock()
79  self.server: Optional[WSGIServer] = None
80  self.thread = Thread(target=self._run)
81  self.thread.start()
82 
83  while self.server is None or self.server.socket is None:
84  info("Waiting for MPC server to start ...")
85  time.sleep(1)
86  port = self.server.socket.getsockname()[1]
87  info(f"MPC server started (port: {port}).")
88 
89  def stop(self) -> None:
90  if self.server is not None:
91  self.server.stop()
92  while self.server is not None:
93  info("Waiting for server to stop ...")
94  time.sleep(1.0)
95  self.thread.join()
96  info("Server stopped.")
97 
98  def _write_state_file(self) -> None:
99  info(f"Writing state: {self.state_file_path}")
100  with open(self.state_file_path, "w") as state_f:
101  state_f.write(self.state.to_json())
102 
103  def _finalize_handler_once(self) -> None:
104  if (not self.handler_finalized) and self.state.have_all_contributions():
105  self.handler_finalized = True
106  self.handler.on_completed()
107 
108  def _update_state(self, now: float) -> None:
109  if self.state.update(now, self.config.contribution_interval):
110  self._on_next_contributor()
111 
112  def _on_contribution(self, now: float) -> None:
113  next_deadline = now + self.config.contribution_interval
114  self.state.received_contribution(next_deadline)
115  self._on_next_contributor()
116 
117  def _on_next_contributor(self) -> None:
119  self._write_state_file()
121 
122  def _notify_next_contributor(self) -> None:
123  if self.state.have_all_contributions() or not self.config.email_server:
124  return
125 
126  contributor_idx = self.state.next_contributor_index
127  idx_readable = contributor_idx + 1
128  total = self.state.num_contributors
129  contributor = self.contributors[contributor_idx]
130  try:
131  _send_mail(
132  email_server=self.config.email_server,
133  email_address=cast(str, self.config.email_address),
134  email_password_file=cast(str, self.config.email_password_file),
135  to_addr=contributor.email,
136  subject=f"[MPC] Your timeslot has begun ({idx_readable}/{total})",
137  body="Please contribute to the MPC using your key: " +
138  export_verification_key(contributor.verification_key))
139  except Exception as ex:
140  print(f"Failed to notify: {contributor.email}: {ex}")
141  error(f"Failed to notify: {contributor.email}: {ex}")
142 
143  def _tick(self) -> None:
144  if self.processing:
145  info("_tick: processing. Ignoring tick")
146  return
147 
148  with self.state_lock:
149  self._update_state(time.time())
150 
151  def _contributors(self, _req: Request) -> Response:
152  return Response(self.contributors.to_json(), 200)
153 
154  def _state(self, _req: Request) -> Response:
155  return Response(self.state.to_json(), 200)
156 
157  def _challenge(self, _req: Request) -> Response:
158  # TODO: Require authentication here, to avoid DoS?
159  self._update_state(time.time())
160  if self.state.have_all_contributions():
161  return Response(
162  "MPC is complete. No remaining challenges", 405)
163 
164  # Function used to stream the challenge file to the contributor.
165  # Streaming is required to avoid timing out while writing the
166  # full challenge file on the socket.
167  def produce_file_chunks(path: str) -> Iterable[bytes]:
168  with open(path, 'rb') as in_f:
169  while True:
170  buf = in_f.read(SEND_CHUNK_SIZE)
171  if buf:
172  yield buf
173  else:
174  break
175 
176  challenge_file = self.handler.get_current_challenge_file(
177  self.state.next_contributor_index)
178  return Response(
179  stream_with_context(produce_file_chunks(challenge_file)),
180  mimetype="application/octet-stream")
181 
182  def _contribute(self, req: Request) -> Response:
183  # Basic request check
184  headers = req.headers
185  if 'Content-Length' not in headers:
186  raise Exception("no Content-Length header")
187  if 'Content-Type' not in headers:
188  raise Exception("no Content-Type header")
189  if 'X-MPC-Digest' not in headers:
190  raise Exception("no X-MPC-Digest header")
191  if 'X-MPC-Public-Key' not in headers:
192  raise Exception("no X-MPC-Public-Key header")
193  if 'X-MPC-Signature' not in headers:
194  raise Exception("no X-MPC-Signature header")
195 
196  content_length = int(headers['Content-Length'])
197  content_type = headers['Content-Type']
198  digest = import_digest(headers['X-MPC-Digest'])
199  pub_key_str = headers.get('X-MPC-Public-Key')
200  sig = import_signature(headers['X-MPC-Signature'])
201 
202  boundary: str = ""
203  for val in content_type.split("; "):
204  if val.startswith("boundary="):
205  boundary = val[len("boundary="):]
206  break
207  if not boundary:
208  raise Exception("content-type contains no boundary")
209 
210  now = time.time()
211  info(f"Contribute: current time = {now}")
212 
213  # Update state using the current time and return an error if
214  # the MPC is no longer active.
215  self._update_state(now)
216  if self.state.have_all_contributions():
217  return Response("MPC complete. No contributions accepted.", 405)
218 
219  # Check that the public key matches the expected next
220  # contributor (as text, rather than relying on comparison
221  # operators)
222  contributor_idx = self.state.next_contributor_index
223  contributor = self.contributors[contributor_idx]
224  verification_key = contributor.verification_key
225  expect_pub_key_str = export_verification_key(verification_key)
226  if expect_pub_key_str != pub_key_str:
227  return Response(
228  f"Contributor key mismatch (contributor {contributor_idx})",
229  403)
230 
231  # Check signature correctness. Ensures that the uploader is the owner
232  # of the correct key BEFORE the costly file upload, taking as little
233  # time as possible with remote hosts other than the next contributor.
234  # (Note that this pre-upload check requires the digest to be passed in
235  # the HTTP header.)
236  if not verify(sig, verification_key, digest):
237  return Response(
238  f"Signature check failed (contributor {contributor_idx})",
239  403)
240 
241  # Accept the upload (if the digest matches). If successful,
242  # pass the file to the handler.
243  if exists(self.upload_file):
244  remove(self.upload_file)
246  content_length,
247  boundary,
248  digest,
249  cast(io.BufferedIOBase, request.stream),
250  self.upload_file)
251 
252  # Mark this instance as busy, launch a processing thread, and
253  # return (releasing the state lock). Until the processing thread
254  # has finished, further requests will just return 503.
255  self.processing = True
256  Thread(target=self._process_contribution).start()
257  info(f"Launched thread for {self.state.next_contributor_index}" +
258  f"/{self.state.num_contributors} contrib")
259  return Response("OK", 200)
260 
261  def _process_contribution(self) -> None:
262  try:
263  info(
264  "_process_contribution(thread): processing contribution " +
265  f"{self.state.next_contributor_index}" +
266  f"/{self.state.num_contributors} (start={time.time()})")
267 
268  if self.handler.process_contribution(
269  self.state.next_contributor_index,
270  self.upload_file):
271  now = time.time()
272  info(f"_process_contribution(thread): SUCCESS (finished {now})")
273  self._on_contribution(now)
274 
275  else:
276  warning("_process_contribution(thread): contribution failed")
277  return
278 
279  finally:
280  try:
281  # Remove the uploaded file if it is still there
282  if exists(self.upload_file):
283  remove(self.upload_file)
284  finally:
285  # Mark server as ready again
286  self.processing = False
287  info("_process_contribution(thread): completed")
288 
289  def _run(self) -> None:
290  # Server and end points
291  app = Flask(__name__)
292 
293  def _with_state_lock(
294  req: Request,
295  callback: Callable[[Request], Response]) -> Response:
296 
297  if self.processing:
298  return Response("Processing contribution. Retry later.", 503)
299 
300  with self.state_lock:
301  try:
302  return callback(req)
303  except Exception as ex:
304  warning(f"error in request: {ex}")
305  print(f"error in request: {ex}")
306  return Response("error: {ex}", 400)
307 
308  @app.route('/contributors', methods=['GET'])
309  def contributors() -> Response: # pylint: disable=unused-variable
310  return _with_state_lock(request, self._contributors)
311 
312  @app.route('/state', methods=['GET'])
313  def state() -> Response: # pylint: disable=unused-variable
314  return _with_state_lock(request, self._state)
315 
316  @app.route('/challenge', methods=['GET'])
317  def challenge() -> Response: # pylint: disable=unused-variable
318  return _with_state_lock(request, self._challenge)
319 
320  @app.route('/contribute', methods=['POST'])
321  def contribute() -> Response: # pylint: disable=unused-variable
322  return _with_state_lock(request, self._contribute)
323 
324  def _tick() -> None:
325  with self.state_lock:
326  self._update_state(time.time())
327 
328  interval = Interval(60.0, _tick)
329  try:
330  if not exists(self.config.tls_certificate):
331  raise Exception(f"no cert file {self.config.tls_certificate}")
332  if not exists(self.config.tls_key):
333  raise Exception(f"no key file {self.config.tls_key}")
334 
335  listen_addr = ('0.0.0.0', self.config.port)
336  self.server = WSGIServer(
337  listen_addr,
338  PathInfoDispatcher({'/': app}),
339  numthreads=1)
340  self.server.ssl_adapter = BuiltinSSLAdapter(
341  self.config.tls_certificate,
342  self.config.tls_key)
343  print(f"Listening on {listen_addr} ...")
344  self.server.start()
345  finally:
346  interval.stop()
347  self.server = None
348 
349 
350 def _send_mail(
351  email_server: str,
352  email_address: str,
353  email_password_file: str,
354  to_addr: str,
355  subject: str,
356  body: str) -> None:
357  """
358  Send an email, given a server + credentials
359  """
360  from ssl import create_default_context
361  from smtplib import SMTP_SSL
362  from email.message import EmailMessage
363 
364  host_port = email_server.split(":")
365  host = host_port[0]
366  if len(host_port) == 2:
367  port = int(host_port[1])
368  else:
369  port = 465
370 
371  ssl_ctx = create_default_context()
372  with SMTP_SSL(host, port, context=ssl_ctx) as smtp:
373  with open(email_password_file, "r") as passwd_f:
374  password = passwd_f.readline().rstrip()
375  smtp.login(email_address, password)
376  msg = EmailMessage()
377  msg.set_content(f"Subject: {subject}\n\n{body}")
378  msg['Subject'] = subject
379  msg['From'] = email_address
380  msg['To'] = to_addr
381  smtp.send_message(msg)
coordinator.server.Server._on_next_contributor
None _on_next_contributor(self)
Definition: server.py:117
coordinator.server.Server._contributors
Response _contributors(self, Request _req)
Definition: server.py:151
coordinator.interval.Interval
Definition: interval.py:12
zeth.cli.zeth_deploy.int
int
Definition: zeth_deploy.py:27
coordinator.crypto.export_verification_key
str export_verification_key(ecdsa.VerifyingKey vk)
Definition: crypto.py:66
coordinator.server.Server.state_lock
state_lock
Definition: server.py:74
coordinator.server.Server._finalize_handler_once
None _finalize_handler_once(self)
Definition: server.py:103
coordinator.server_state.initial_server_state
ServerState initial_server_state(Configuration configuration, ContributorList contributors)
Definition: server_state.py:86
coordinator.server.Server._challenge
Response _challenge(self, Request _req)
Definition: server.py:157
coordinator.server.Server
Definition: server.py:38
coordinator.crypto.import_digest
bytes import_digest(str digest_s)
Definition: crypto.py:40
coordinator.server.Server.config
config
Definition: server.py:51
coordinator.upload_utils.handle_upload_request
None handle_upload_request(int content_length, str content_boundary, bytes expect_digest, io.BufferedIOBase stream, str file_name)
Definition: upload_utils.py:72
coordinator.server.Server.stop
None stop(self)
Definition: server.py:89
coordinator.server.Server._update_state
None _update_state(self, float now)
Definition: server.py:108
zeth.core.signing.verify
bool verify(SigningVerificationKey vk, bytes m, int sigma)
Definition: signing.py:145
coordinator.server.Server._state
Response _state(self, Request _req)
Definition: server.py:154
coordinator.server.Server.__init__
def __init__(self, IContributionHandler handler, Configuration server_config, str server_dir)
Definition: server.py:47
coordinator.server.Server.state_file_path
state_file_path
Definition: server.py:54
coordinator.server.Server._on_contribution
None _on_contribution(self, float now)
Definition: server.py:112
coordinator.server.Server._contribute
Response _contribute(self, Request req)
Definition: server.py:182
coordinator.contribute.contribute
None contribute(str base_url, str key_file, str challenge_file, Callable[[], str] contribute_cb, int wait_interval, Optional[str] server_certificate, bool insecure)
Definition: contribute.py:65
coordinator.server.Server.state
state
Definition: server.py:67
coordinator.server.Server._run
None _run(self)
Definition: server.py:289
coordinator.server.Server._notify_next_contributor
None _notify_next_contributor(self)
Definition: server.py:122
coordinator.server.Server.handler
handler
Definition: server.py:50
coordinator.crypto.import_signature
bytes import_signature(str sig_s)
Definition: crypto.py:78
coordinator.server.Server.server
server
Definition: server.py:336
coordinator.server.Server.processing
processing
Definition: server.py:56
coordinator.server.Server._write_state_file
None _write_state_file(self)
Definition: server.py:98
coordinator.server.Server.thread
thread
Definition: server.py:76
coordinator.server.Server.handler_finalized
handler_finalized
Definition: server.py:73
coordinator.server.Server.contributors
contributors
Definition: server.py:60
coordinator.server.Server._process_contribution
None _process_contribution(self)
Definition: server.py:261
coordinator.server.Server.upload_file
upload_file
Definition: server.py:53