mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-10-21 20:31:08 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			590 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			590 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| # Copyright 2021 The Matrix.org Foundation C.I.C.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| # This script reads environment variables and generates a shared Synapse worker,
 | |
| # nginx and supervisord configs depending on the workers requested.
 | |
| #
 | |
| # The environment variables it reads are:
 | |
| #   * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
 | |
| #   * SYNAPSE_REPORT_STATS: Whether to report stats.
 | |
| #   * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
 | |
| #         below. Leave empty for no workers, or set to '*' for all possible workers.
 | |
| #
 | |
| # NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
 | |
| # in the project's README), this script may be run multiple times, and functionality should
 | |
| # continue to work if so.
 | |
| 
 | |
| import os
 | |
| import subprocess
 | |
| import sys
 | |
| from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Set
 | |
| 
 | |
| import jinja2
 | |
| import yaml
 | |
| 
 | |
| MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
 | |
| 
 | |
| 
 | |
| WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
 | |
|     "pusher": {
 | |
|         "app": "synapse.app.pusher",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {"start_pushers": False},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "user_dir": {
 | |
|         "app": "synapse.app.user_dir",
 | |
|         "listener_resources": ["client"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
 | |
|         ],
 | |
|         "shared_extra_conf": {"update_user_directory": False},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "media_repository": {
 | |
|         "app": "synapse.app.media_repository",
 | |
|         "listener_resources": ["media"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/media/",
 | |
|             "^/_synapse/admin/v1/purge_media_cache$",
 | |
|             "^/_synapse/admin/v1/room/.*/media.*$",
 | |
|             "^/_synapse/admin/v1/user/.*/media.*$",
 | |
|             "^/_synapse/admin/v1/media/.*$",
 | |
|             "^/_synapse/admin/v1/quarantine_media/.*$",
 | |
|         ],
 | |
|         "shared_extra_conf": {"enable_media_repo": False},
 | |
|         "worker_extra_conf": "enable_media_repo: true",
 | |
|     },
 | |
|     "appservice": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {"notify_appservices_from_worker": "appservice"},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "federation_sender": {
 | |
|         "app": "synapse.app.federation_sender",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {"send_federation": False},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "synchrotron": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(v2_alpha|r0|v3)/sync$",
 | |
|             "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3)/initialSync$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "federation_reader": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["federation"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/federation/(v1|v2)/event/",
 | |
|             "^/_matrix/federation/(v1|v2)/state/",
 | |
|             "^/_matrix/federation/(v1|v2)/state_ids/",
 | |
|             "^/_matrix/federation/(v1|v2)/backfill/",
 | |
|             "^/_matrix/federation/(v1|v2)/get_missing_events/",
 | |
|             "^/_matrix/federation/(v1|v2)/publicRooms",
 | |
|             "^/_matrix/federation/(v1|v2)/query/",
 | |
|             "^/_matrix/federation/(v1|v2)/make_join/",
 | |
|             "^/_matrix/federation/(v1|v2)/make_leave/",
 | |
|             "^/_matrix/federation/(v1|v2)/send_join/",
 | |
|             "^/_matrix/federation/(v1|v2)/send_leave/",
 | |
|             "^/_matrix/federation/(v1|v2)/invite/",
 | |
|             "^/_matrix/federation/(v1|v2)/query_auth/",
 | |
|             "^/_matrix/federation/(v1|v2)/event_auth/",
 | |
|             "^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
 | |
|             "^/_matrix/federation/(v1|v2)/user/devices/",
 | |
|             "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
 | |
|             "^/_matrix/key/v2/query",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "federation_inbound": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["federation"],
 | |
|         "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "event_persister": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["replication"],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "background_worker": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         # This worker cannot be sharded. Therefore there should only ever be one background
 | |
|         # worker, and it should be named background_worker1
 | |
|         "shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "event_creator": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "frontend_proxy": {
 | |
|         "app": "synapse.app.frontend_proxy",
 | |
|         "listener_resources": ["client", "replication"],
 | |
|         "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": (
 | |
|             "worker_main_http_uri: http://127.0.0.1:%d"
 | |
|             % (MAIN_PROCESS_HTTP_LISTENER_PORT,)
 | |
|         ),
 | |
|     },
 | |
| }
 | |
| 
 | |
| # Templates for sections that may be inserted multiple times in config files
 | |
| SUPERVISORD_PROCESS_CONFIG_BLOCK = """
 | |
| [program:synapse_{name}]
 | |
| command=/usr/local/bin/prefix-log /usr/local/bin/python -m {app} \
 | |
|     --config-path="{config_path}" \
 | |
|     --config-path=/conf/workers/shared.yaml \
 | |
|     --config-path=/conf/workers/{name}.yaml
 | |
| autorestart=unexpected
 | |
| priority=500
 | |
| exitcodes=0
 | |
| stdout_logfile=/dev/stdout
 | |
| stdout_logfile_maxbytes=0
 | |
| stderr_logfile=/dev/stderr
 | |
| stderr_logfile_maxbytes=0
 | |
| """
 | |
| 
 | |
| NGINX_LOCATION_CONFIG_BLOCK = """
 | |
|     location ~* {endpoint} {{
 | |
|         proxy_pass {upstream};
 | |
|         proxy_set_header X-Forwarded-For $remote_addr;
 | |
|         proxy_set_header X-Forwarded-Proto $scheme;
 | |
|         proxy_set_header Host $host;
 | |
|     }}
 | |
| """
 | |
| 
 | |
| NGINX_UPSTREAM_CONFIG_BLOCK = """
 | |
| upstream {upstream_worker_type} {{
 | |
| {body}
 | |
| }}
 | |
| """
 | |
| 
 | |
| 
 | |
| # Utility functions
 | |
| def log(txt: str) -> None:
 | |
|     """Log something to the stdout.
 | |
| 
 | |
|     Args:
 | |
|         txt: The text to log.
 | |
|     """
 | |
|     print(txt)
 | |
| 
 | |
| 
 | |
| def error(txt: str) -> NoReturn:
 | |
|     """Log something and exit with an error code.
 | |
| 
 | |
|     Args:
 | |
|         txt: The text to log in error.
 | |
|     """
 | |
|     log(txt)
 | |
|     sys.exit(2)
 | |
| 
 | |
| 
 | |
| def convert(src: str, dst: str, **template_vars: object) -> None:
 | |
|     """Generate a file from a template
 | |
| 
 | |
|     Args:
 | |
|         src: Path to the input file.
 | |
|         dst: Path to write to.
 | |
|         template_vars: The arguments to replace placeholder variables in the template with.
 | |
|     """
 | |
|     # Read the template file
 | |
|     with open(src) as infile:
 | |
|         template = infile.read()
 | |
| 
 | |
|     # Generate a string from the template. We disable autoescape to prevent template
 | |
|     # variables from being escaped.
 | |
|     rendered = jinja2.Template(template, autoescape=False).render(**template_vars)
 | |
| 
 | |
|     # Write the generated contents to a file
 | |
|     #
 | |
|     # We use append mode in case the files have already been written to by something else
 | |
|     # (for instance, as part of the instructions in a dockerfile).
 | |
|     with open(dst, "a") as outfile:
 | |
|         # In case the existing file doesn't end with a newline
 | |
|         outfile.write("\n")
 | |
| 
 | |
|         outfile.write(rendered)
 | |
| 
 | |
| 
 | |
| def add_sharding_to_shared_config(
 | |
|     shared_config: dict,
 | |
|     worker_type: str,
 | |
|     worker_name: str,
 | |
|     worker_port: int,
 | |
| ) -> None:
 | |
|     """Given a dictionary representing a config file shared across all workers,
 | |
|     append sharded worker information to it for the current worker_type instance.
 | |
| 
 | |
|     Args:
 | |
|         shared_config: The config dict that all worker instances share (after being converted to YAML)
 | |
|         worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
 | |
|         worker_name: The name of the worker instance.
 | |
|         worker_port: The HTTP replication port that the worker instance is listening on.
 | |
|     """
 | |
|     # The instance_map config field marks the workers that write to various replication streams
 | |
|     instance_map = shared_config.setdefault("instance_map", {})
 | |
| 
 | |
|     # Worker-type specific sharding config
 | |
|     if worker_type == "pusher":
 | |
|         shared_config.setdefault("pusher_instances", []).append(worker_name)
 | |
| 
 | |
|     elif worker_type == "federation_sender":
 | |
|         shared_config.setdefault("federation_sender_instances", []).append(worker_name)
 | |
| 
 | |
|     elif worker_type == "event_persister":
 | |
|         # Event persisters write to the events stream, so we need to update
 | |
|         # the list of event stream writers
 | |
|         shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
 | |
|             worker_name
 | |
|         )
 | |
| 
 | |
|         # Map of stream writer instance names to host/ports combos
 | |
|         instance_map[worker_name] = {
 | |
|             "host": "localhost",
 | |
|             "port": worker_port,
 | |
|         }
 | |
| 
 | |
|     elif worker_type == "media_repository":
 | |
|         # The first configured media worker will run the media background jobs
 | |
|         shared_config.setdefault("media_instance_running_background_jobs", worker_name)
 | |
| 
 | |
| 
 | |
| def generate_base_homeserver_config() -> None:
 | |
|     """Starts Synapse and generates a basic homeserver config, which will later be
 | |
|     modified for worker support.
 | |
| 
 | |
|     Raises: CalledProcessError if calling start.py returned a non-zero exit code.
 | |
|     """
 | |
|     # start.py already does this for us, so just call that.
 | |
|     # note that this script is copied in in the official, monolith dockerfile
 | |
|     os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
 | |
|     subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"])
 | |
| 
 | |
| 
 | |
| def generate_worker_files(
 | |
|     environ: Mapping[str, str], config_path: str, data_dir: str
 | |
| ) -> None:
 | |
|     """Read the desired list of workers from environment variables and generate
 | |
|     shared homeserver, nginx and supervisord configs.
 | |
| 
 | |
|     Args:
 | |
|         environ: os.environ instance.
 | |
|         config_path: The location of the generated Synapse main worker config file.
 | |
|         data_dir: The location of the synapse data directory. Where log and
 | |
|             user-facing config files live.
 | |
|     """
 | |
|     # Note that yaml cares about indentation, so care should be taken to insert lines
 | |
|     # into files at the correct indentation below.
 | |
| 
 | |
|     # shared_config is the contents of a Synapse config file that will be shared amongst
 | |
|     # the main Synapse process as well as all workers.
 | |
|     # It is intended mainly for disabling functionality when certain workers are spun up,
 | |
|     # and adding a replication listener.
 | |
| 
 | |
|     # First read the original config file and extract the listeners block. Then we'll add
 | |
|     # another listener for replication. Later we'll write out the result to the shared
 | |
|     # config file.
 | |
|     listeners = [
 | |
|         {
 | |
|             "port": 9093,
 | |
|             "bind_address": "127.0.0.1",
 | |
|             "type": "http",
 | |
|             "resources": [{"names": ["replication"]}],
 | |
|         }
 | |
|     ]
 | |
|     with open(config_path) as file_stream:
 | |
|         original_config = yaml.safe_load(file_stream)
 | |
|         original_listeners = original_config.get("listeners")
 | |
|         if original_listeners:
 | |
|             listeners += original_listeners
 | |
| 
 | |
|     # The shared homeserver config. The contents of which will be inserted into the
 | |
|     # base shared worker jinja2 template.
 | |
|     #
 | |
|     # This config file will be passed to all workers, included Synapse's main process.
 | |
|     shared_config: Dict[str, Any] = {"listeners": listeners}
 | |
| 
 | |
|     # The supervisord config. The contents of which will be inserted into the
 | |
|     # base supervisord jinja2 template.
 | |
|     #
 | |
|     # Supervisord will be in charge of running everything, from redis to nginx to Synapse
 | |
|     # and all of its worker processes. Load the config template, which defines a few
 | |
|     # services that are necessary to run.
 | |
|     supervisord_config = ""
 | |
| 
 | |
|     # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
 | |
|     # ports of each worker. For example:
 | |
|     # {
 | |
|     #   worker_type: {1234, 1235, ...}}
 | |
|     # }
 | |
|     # and will be used to construct 'upstream' nginx directives.
 | |
|     nginx_upstreams: Dict[str, Set[int]] = {}
 | |
| 
 | |
|     # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
 | |
|     # placed after the proxy_pass directive. The main benefit to representing this data as a
 | |
|     # dict over a str is that we can easily deduplicate endpoints across multiple instances
 | |
|     # of the same worker.
 | |
|     #
 | |
|     # An nginx site config that will be amended to depending on the workers that are
 | |
|     # spun up. To be placed in /etc/nginx/conf.d.
 | |
|     nginx_locations = {}
 | |
| 
 | |
|     # Read the desired worker configuration from the environment
 | |
|     worker_types_env = environ.get("SYNAPSE_WORKER_TYPES")
 | |
|     if worker_types_env is None:
 | |
|         # No workers, just the main process
 | |
|         worker_types = []
 | |
|     else:
 | |
|         # Split type names by comma
 | |
|         worker_types = worker_types_env.split(",")
 | |
| 
 | |
|     # Create the worker configuration directory if it doesn't already exist
 | |
|     os.makedirs("/conf/workers", exist_ok=True)
 | |
| 
 | |
|     # Start worker ports from this arbitrary port
 | |
|     worker_port = 18009
 | |
| 
 | |
|     # A counter of worker_type -> int. Used for determining the name for a given
 | |
|     # worker type when generating its config file, as each worker's name is just
 | |
|     # worker_type + instance #
 | |
|     worker_type_counter: Dict[str, int] = {}
 | |
| 
 | |
|     # A list of internal endpoints to healthcheck, starting with the main process
 | |
|     # which exists even if no workers do.
 | |
|     healthcheck_urls = ["http://localhost:8080/health"]
 | |
| 
 | |
|     # For each worker type specified by the user, create config values
 | |
|     for worker_type in worker_types:
 | |
|         worker_type = worker_type.strip()
 | |
| 
 | |
|         worker_config = WORKERS_CONFIG.get(worker_type)
 | |
|         if worker_config:
 | |
|             worker_config = worker_config.copy()
 | |
|         else:
 | |
|             log(worker_type + " is an unknown worker type! It will be ignored")
 | |
|             continue
 | |
| 
 | |
|         new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
 | |
|         worker_type_counter[worker_type] = new_worker_count
 | |
| 
 | |
|         # Name workers by their type concatenated with an incrementing number
 | |
|         # e.g. federation_reader1
 | |
|         worker_name = worker_type + str(new_worker_count)
 | |
|         worker_config.update(
 | |
|             {"name": worker_name, "port": str(worker_port), "config_path": config_path}
 | |
|         )
 | |
| 
 | |
|         # Update the shared config with any worker-type specific options
 | |
|         shared_config.update(worker_config["shared_extra_conf"])
 | |
| 
 | |
|         healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
 | |
| 
 | |
|         # Check if more than one instance of this worker type has been specified
 | |
|         worker_type_total_count = worker_types.count(worker_type)
 | |
|         if worker_type_total_count > 1:
 | |
|             # Update the shared config with sharding-related options if necessary
 | |
|             add_sharding_to_shared_config(
 | |
|                 shared_config, worker_type, worker_name, worker_port
 | |
|             )
 | |
| 
 | |
|         # Enable the worker in supervisord
 | |
|         supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config)
 | |
| 
 | |
|         # Add nginx location blocks for this worker's endpoints (if any are defined)
 | |
|         for pattern in worker_config["endpoint_patterns"]:
 | |
|             # Determine whether we need to load-balance this worker
 | |
|             if worker_type_total_count > 1:
 | |
|                 # Create or add to a load-balanced upstream for this worker
 | |
|                 nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
 | |
| 
 | |
|                 # Upstreams are named after the worker_type
 | |
|                 upstream = "http://" + worker_type
 | |
|             else:
 | |
|                 upstream = "http://localhost:%d" % (worker_port,)
 | |
| 
 | |
|             # Note that this endpoint should proxy to this upstream
 | |
|             nginx_locations[pattern] = upstream
 | |
| 
 | |
|         # Write out the worker's logging config file
 | |
| 
 | |
|         log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
 | |
| 
 | |
|         # Then a worker config file
 | |
|         convert(
 | |
|             "/conf/worker.yaml.j2",
 | |
|             "/conf/workers/{name}.yaml".format(name=worker_name),
 | |
|             **worker_config,
 | |
|             worker_log_config_filepath=log_config_filepath,
 | |
|         )
 | |
| 
 | |
|         worker_port += 1
 | |
| 
 | |
|     # Build the nginx location config blocks
 | |
|     nginx_location_config = ""
 | |
|     for endpoint, upstream in nginx_locations.items():
 | |
|         nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
 | |
|             endpoint=endpoint,
 | |
|             upstream=upstream,
 | |
|         )
 | |
| 
 | |
|     # Determine the load-balancing upstreams to configure
 | |
|     nginx_upstream_config = ""
 | |
| 
 | |
|     for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
 | |
|         body = ""
 | |
|         for port in upstream_worker_ports:
 | |
|             body += "    server localhost:%d;\n" % (port,)
 | |
| 
 | |
|         # Add to the list of configured upstreams
 | |
|         nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
 | |
|             upstream_worker_type=upstream_worker_type,
 | |
|             body=body,
 | |
|         )
 | |
| 
 | |
|     # Finally, we'll write out the config files.
 | |
| 
 | |
|     # log config for the master process
 | |
|     master_log_config = generate_worker_log_config(environ, "master", data_dir)
 | |
|     shared_config["log_config"] = master_log_config
 | |
| 
 | |
|     # Shared homeserver config
 | |
|     convert(
 | |
|         "/conf/shared.yaml.j2",
 | |
|         "/conf/workers/shared.yaml",
 | |
|         shared_worker_config=yaml.dump(shared_config),
 | |
|     )
 | |
| 
 | |
|     # Nginx config
 | |
|     convert(
 | |
|         "/conf/nginx.conf.j2",
 | |
|         "/etc/nginx/conf.d/matrix-synapse.conf",
 | |
|         worker_locations=nginx_location_config,
 | |
|         upstream_directives=nginx_upstream_config,
 | |
|     )
 | |
| 
 | |
|     # Supervisord config
 | |
|     os.makedirs("/etc/supervisor", exist_ok=True)
 | |
|     convert(
 | |
|         "/conf/supervisord.conf.j2",
 | |
|         "/etc/supervisor/supervisord.conf",
 | |
|         main_config_path=config_path,
 | |
|         worker_config=supervisord_config,
 | |
|     )
 | |
| 
 | |
|     # healthcheck config
 | |
|     convert(
 | |
|         "/conf/healthcheck.sh.j2",
 | |
|         "/healthcheck.sh",
 | |
|         healthcheck_urls=healthcheck_urls,
 | |
|     )
 | |
| 
 | |
|     # Ensure the logging directory exists
 | |
|     log_dir = data_dir + "/logs"
 | |
|     if not os.path.exists(log_dir):
 | |
|         os.mkdir(log_dir)
 | |
| 
 | |
| 
 | |
| def generate_worker_log_config(
 | |
|     environ: Mapping[str, str], worker_name: str, data_dir: str
 | |
| ) -> str:
 | |
|     """Generate a log.config file for the given worker.
 | |
| 
 | |
|     Returns: the path to the generated file
 | |
|     """
 | |
|     # Check whether we should write worker logs to disk, in addition to the console
 | |
|     extra_log_template_args = {}
 | |
|     if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
 | |
|         extra_log_template_args["LOG_FILE_PATH"] = "{dir}/logs/{name}.log".format(
 | |
|             dir=data_dir, name=worker_name
 | |
|         )
 | |
|     # Render and write the file
 | |
|     log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
 | |
|     convert(
 | |
|         "/conf/log.config",
 | |
|         log_config_filepath,
 | |
|         worker_name=worker_name,
 | |
|         **extra_log_template_args,
 | |
|     )
 | |
|     return log_config_filepath
 | |
| 
 | |
| 
 | |
| def main(args: List[str], environ: MutableMapping[str, str]) -> None:
 | |
|     config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
 | |
|     config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
 | |
|     data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
 | |
| 
 | |
|     # override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
 | |
|     # this needs to be handled by a frontend proxy
 | |
|     environ["SYNAPSE_NO_TLS"] = "yes"
 | |
| 
 | |
|     # Generate the base homeserver config if one does not yet exist
 | |
|     if not os.path.exists(config_path):
 | |
|         log("Generating base homeserver config")
 | |
|         generate_base_homeserver_config()
 | |
| 
 | |
|     # This script may be run multiple times (mostly by Complement, see note at top of file).
 | |
|     # Don't re-configure workers in this instance.
 | |
|     mark_filepath = "/conf/workers_have_been_configured"
 | |
|     if not os.path.exists(mark_filepath):
 | |
|         # Always regenerate all other config files
 | |
|         generate_worker_files(environ, config_path, data_dir)
 | |
| 
 | |
|         # Mark workers as being configured
 | |
|         with open(mark_filepath, "w") as f:
 | |
|             f.write("")
 | |
| 
 | |
|     # Start supervisord, which will start Synapse, all of the configured worker
 | |
|     # processes, redis, nginx etc. according to the config we created above.
 | |
|     log("Starting supervisord")
 | |
|     os.execl(
 | |
|         "/usr/local/bin/supervisord",
 | |
|         "supervisord",
 | |
|         "-c",
 | |
|         "/etc/supervisor/supervisord.conf",
 | |
|     )
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main(sys.argv, os.environ)
 |