mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-10-31 08:11:24 +01:00 
			
		
		
		
	Move FederationServer._handle_new_pdu to FederationHandler
Unfortunately this significantly increases the size of the already-rather-big FederationHandler, but the code fits more naturally here, and it paves the way for the tighter integration that I need between handling incoming PDUs and doing the join dance. Other than renaming the existing `FederationHandler.on_receive_pdu` to `_process_received_pdu` to make way for it, this just consists of the move, and replacing `self.handler` with `self` and `self` with `self.replication_layer`.
This commit is contained in:
		
							parent
							
								
									e8b1721290
								
							
						
					
					
						commit
						29235901b8
					
				| @ -52,7 +52,6 @@ class FederationServer(FederationBase): | |||||||
| 
 | 
 | ||||||
|         self.auth = hs.get_auth() |         self.auth = hs.get_auth() | ||||||
| 
 | 
 | ||||||
|         self._room_pdu_linearizer = Linearizer("fed_room_pdu") |  | ||||||
|         self._server_linearizer = Linearizer("fed_server") |         self._server_linearizer = Linearizer("fed_server") | ||||||
| 
 | 
 | ||||||
|         # We cache responses to state queries, as they take a while and often |         # We cache responses to state queries, as they take a while and often | ||||||
| @ -518,198 +517,7 @@ class FederationServer(FederationBase): | |||||||
|                 affected=pdu.event_id, |                 affected=pdu.event_id, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         yield self._handle_new_pdu(origin, pdu, get_missing=True) |         yield self.handler.on_receive_pdu(origin, pdu, get_missing=True) | ||||||
| 
 |  | ||||||
|     @defer.inlineCallbacks |  | ||||||
|     @log_function |  | ||||||
|     def _handle_new_pdu(self, origin, pdu, get_missing=True): |  | ||||||
|         """ Process a PDU received via a federation /send/ transaction, or |  | ||||||
|         via backfill of missing prev_events |  | ||||||
| 
 |  | ||||||
|         Args: |  | ||||||
|             origin (str): server which initiated the /send/ transaction. Will |  | ||||||
|                 be used to fetch missing events or state. |  | ||||||
|             pdu (FrozenEvent): received PDU |  | ||||||
|             get_missing (bool): True if we should fetch missing prev_events |  | ||||||
| 
 |  | ||||||
|         Returns (Deferred): completes with None |  | ||||||
|         """ |  | ||||||
| 
 |  | ||||||
|         # We reprocess pdus when we have seen them only as outliers |  | ||||||
|         existing = yield self._get_persisted_pdu( |  | ||||||
|             origin, pdu.event_id, do_auth=False |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         # FIXME: Currently we fetch an event again when we already have it |  | ||||||
|         # if it has been marked as an outlier. |  | ||||||
| 
 |  | ||||||
|         already_seen = ( |  | ||||||
|             existing and ( |  | ||||||
|                 not existing.internal_metadata.is_outlier() |  | ||||||
|                 or pdu.internal_metadata.is_outlier() |  | ||||||
|             ) |  | ||||||
|         ) |  | ||||||
|         if already_seen: |  | ||||||
|             logger.debug("Already seen pdu %s", pdu.event_id) |  | ||||||
|             return |  | ||||||
| 
 |  | ||||||
|         state = None |  | ||||||
| 
 |  | ||||||
|         auth_chain = [] |  | ||||||
| 
 |  | ||||||
|         have_seen = yield self.store.have_events( |  | ||||||
|             [ev for ev, _ in pdu.prev_events] |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         fetch_state = False |  | ||||||
| 
 |  | ||||||
|         # Get missing pdus if necessary. |  | ||||||
|         if not pdu.internal_metadata.is_outlier(): |  | ||||||
|             # We only backfill backwards to the min depth. |  | ||||||
|             min_depth = yield self.handler.get_min_depth_for_context( |  | ||||||
|                 pdu.room_id |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             logger.debug( |  | ||||||
|                 "_handle_new_pdu min_depth for %s: %d", |  | ||||||
|                 pdu.room_id, min_depth |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             prevs = {e_id for e_id, _ in pdu.prev_events} |  | ||||||
|             seen = set(have_seen.keys()) |  | ||||||
| 
 |  | ||||||
|             if min_depth and pdu.depth < min_depth: |  | ||||||
|                 # This is so that we don't notify the user about this |  | ||||||
|                 # message, to work around the fact that some events will |  | ||||||
|                 # reference really really old events we really don't want to |  | ||||||
|                 # send to the clients. |  | ||||||
|                 pdu.internal_metadata.outlier = True |  | ||||||
|             elif min_depth and pdu.depth > min_depth: |  | ||||||
|                 if get_missing and prevs - seen: |  | ||||||
|                     # If we're missing stuff, ensure we only fetch stuff one |  | ||||||
|                     # at a time. |  | ||||||
|                     logger.info( |  | ||||||
|                         "Acquiring lock for room %r to fetch %d missing events: %r...", |  | ||||||
|                         pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], |  | ||||||
|                     ) |  | ||||||
|                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)): |  | ||||||
|                         logger.info( |  | ||||||
|                             "Acquired lock for room %r to fetch %d missing events", |  | ||||||
|                             pdu.room_id, len(prevs - seen), |  | ||||||
|                         ) |  | ||||||
| 
 |  | ||||||
|                         yield self._get_missing_events_for_pdu( |  | ||||||
|                             origin, pdu, prevs, min_depth |  | ||||||
|                         ) |  | ||||||
| 
 |  | ||||||
|             prevs = {e_id for e_id, _ in pdu.prev_events} |  | ||||||
|             seen = set(have_seen.keys()) |  | ||||||
|             if prevs - seen: |  | ||||||
|                 logger.info( |  | ||||||
|                     "Still missing %d events for room %r: %r...", |  | ||||||
|                     len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] |  | ||||||
|                 ) |  | ||||||
|                 fetch_state = True |  | ||||||
| 
 |  | ||||||
|         if fetch_state: |  | ||||||
|             # We need to get the state at this event, since we haven't |  | ||||||
|             # processed all the prev events. |  | ||||||
|             logger.debug( |  | ||||||
|                 "_handle_new_pdu getting state for %s", |  | ||||||
|                 pdu.room_id |  | ||||||
|             ) |  | ||||||
|             try: |  | ||||||
|                 state, auth_chain = yield self.get_state_for_room( |  | ||||||
|                     origin, pdu.room_id, pdu.event_id, |  | ||||||
|                 ) |  | ||||||
|             except: |  | ||||||
|                 logger.exception("Failed to get state for event: %s", pdu.event_id) |  | ||||||
| 
 |  | ||||||
|         yield self.handler.on_receive_pdu( |  | ||||||
|             origin, |  | ||||||
|             pdu, |  | ||||||
|             state=state, |  | ||||||
|             auth_chain=auth_chain, |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     @defer.inlineCallbacks |  | ||||||
|     def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): |  | ||||||
|         """ |  | ||||||
|         Args: |  | ||||||
|             origin (str): Origin of the pdu. Will be called to get the missing events |  | ||||||
|             pdu: received pdu |  | ||||||
|             prevs (str[]): List of event ids which we are missing |  | ||||||
|             min_depth (int): Minimum depth of events to return. |  | ||||||
| 
 |  | ||||||
|         Returns: |  | ||||||
|             Deferred<dict(str, str?)>: updated have_seen dictionary |  | ||||||
|         """ |  | ||||||
|         # We recalculate seen, since it may have changed. |  | ||||||
|         have_seen = yield self.store.have_events(prevs) |  | ||||||
|         seen = set(have_seen.keys()) |  | ||||||
| 
 |  | ||||||
|         if not prevs - seen: |  | ||||||
|             # nothing left to do |  | ||||||
|             defer.returnValue(have_seen) |  | ||||||
| 
 |  | ||||||
|         latest = yield self.store.get_latest_event_ids_in_room( |  | ||||||
|             pdu.room_id |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         # We add the prev events that we have seen to the latest |  | ||||||
|         # list to ensure the remote server doesn't give them to us |  | ||||||
|         latest = set(latest) |  | ||||||
|         latest |= seen |  | ||||||
| 
 |  | ||||||
|         logger.info( |  | ||||||
|             "Missing %d events for room %r: %r...", |  | ||||||
|             len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         # XXX: we set timeout to 10s to help workaround |  | ||||||
|         # https://github.com/matrix-org/synapse/issues/1733. |  | ||||||
|         # The reason is to avoid holding the linearizer lock |  | ||||||
|         # whilst processing inbound /send transactions, causing |  | ||||||
|         # FDs to stack up and block other inbound transactions |  | ||||||
|         # which empirically can currently take up to 30 minutes. |  | ||||||
|         # |  | ||||||
|         # N.B. this explicitly disables retry attempts. |  | ||||||
|         # |  | ||||||
|         # N.B. this also increases our chances of falling back to |  | ||||||
|         # fetching fresh state for the room if the missing event |  | ||||||
|         # can't be found, which slightly reduces our security. |  | ||||||
|         # it may also increase our DAG extremity count for the room, |  | ||||||
|         # causing additional state resolution?  See #1760. |  | ||||||
|         # However, fetching state doesn't hold the linearizer lock |  | ||||||
|         # apparently. |  | ||||||
|         # |  | ||||||
|         # see https://github.com/matrix-org/synapse/pull/1744 |  | ||||||
| 
 |  | ||||||
|         missing_events = yield self.get_missing_events( |  | ||||||
|             origin, |  | ||||||
|             pdu.room_id, |  | ||||||
|             earliest_events_ids=list(latest), |  | ||||||
|             latest_events=[pdu], |  | ||||||
|             limit=10, |  | ||||||
|             min_depth=min_depth, |  | ||||||
|             timeout=10000, |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         # We want to sort these by depth so we process them and |  | ||||||
|         # tell clients about them in order. |  | ||||||
|         missing_events.sort(key=lambda x: x.depth) |  | ||||||
| 
 |  | ||||||
|         for e in missing_events: |  | ||||||
|             yield self._handle_new_pdu( |  | ||||||
|                 origin, |  | ||||||
|                 e, |  | ||||||
|                 get_missing=False |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         have_seen = yield self.store.have_events( |  | ||||||
|             [ev for ev, _ in pdu.prev_events] |  | ||||||
|         ) |  | ||||||
|         defer.returnValue(have_seen) |  | ||||||
| 
 | 
 | ||||||
|     def __str__(self): |     def __str__(self): | ||||||
|         return "<ReplicationLayer(%s)>" % self.server_name |         return "<ReplicationLayer(%s)>" % self.server_name | ||||||
|  | |||||||
| @ -31,7 +31,7 @@ from synapse.util.logcontext import ( | |||||||
| ) | ) | ||||||
| from synapse.util.metrics import measure_func | from synapse.util.metrics import measure_func | ||||||
| from synapse.util.logutils import log_function | from synapse.util.logutils import log_function | ||||||
| from synapse.util.async import run_on_reactor | from synapse.util.async import run_on_reactor, Linearizer | ||||||
| from synapse.util.frozenutils import unfreeze | from synapse.util.frozenutils import unfreeze | ||||||
| from synapse.crypto.event_signing import ( | from synapse.crypto.event_signing import ( | ||||||
|     compute_event_signature, add_hashes_and_signatures, |     compute_event_signature, add_hashes_and_signatures, | ||||||
| @ -79,12 +79,204 @@ class FederationHandler(BaseHandler): | |||||||
| 
 | 
 | ||||||
|         # When joining a room we need to queue any events for that room up |         # When joining a room we need to queue any events for that room up | ||||||
|         self.room_queues = {} |         self.room_queues = {} | ||||||
|  |         self._room_pdu_linearizer = Linearizer("fed_room_pdu") | ||||||
|  | 
 | ||||||
|  |     @defer.inlineCallbacks | ||||||
|  |     @log_function | ||||||
|  |     def on_receive_pdu(self, origin, pdu, get_missing=True): | ||||||
|  |         """ Process a PDU received via a federation /send/ transaction, or | ||||||
|  |         via backfill of missing prev_events | ||||||
|  | 
 | ||||||
|  |         Args: | ||||||
|  |             origin (str): server which initiated the /send/ transaction. Will | ||||||
|  |                 be used to fetch missing events or state. | ||||||
|  |             pdu (FrozenEvent): received PDU | ||||||
|  |             get_missing (bool): True if we should fetch missing prev_events | ||||||
|  | 
 | ||||||
|  |         Returns (Deferred): completes with None | ||||||
|  |         """ | ||||||
|  | 
 | ||||||
|  |         # We reprocess pdus when we have seen them only as outliers | ||||||
|  |         existing = yield self.get_persisted_pdu( | ||||||
|  |             origin, pdu.event_id, do_auth=False | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # FIXME: Currently we fetch an event again when we already have it | ||||||
|  |         # if it has been marked as an outlier. | ||||||
|  | 
 | ||||||
|  |         already_seen = ( | ||||||
|  |             existing and ( | ||||||
|  |                 not existing.internal_metadata.is_outlier() | ||||||
|  |                 or pdu.internal_metadata.is_outlier() | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  |         if already_seen: | ||||||
|  |             logger.debug("Already seen pdu %s", pdu.event_id) | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|  |         state = None | ||||||
|  | 
 | ||||||
|  |         auth_chain = [] | ||||||
|  | 
 | ||||||
|  |         have_seen = yield self.store.have_events( | ||||||
|  |             [ev for ev, _ in pdu.prev_events] | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         fetch_state = False | ||||||
|  | 
 | ||||||
|  |         # Get missing pdus if necessary. | ||||||
|  |         if not pdu.internal_metadata.is_outlier(): | ||||||
|  |             # We only backfill backwards to the min depth. | ||||||
|  |             min_depth = yield self.get_min_depth_for_context( | ||||||
|  |                 pdu.room_id | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             logger.debug( | ||||||
|  |                 "_handle_new_pdu min_depth for %s: %d", | ||||||
|  |                 pdu.room_id, min_depth | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             prevs = {e_id for e_id, _ in pdu.prev_events} | ||||||
|  |             seen = set(have_seen.keys()) | ||||||
|  | 
 | ||||||
|  |             if min_depth and pdu.depth < min_depth: | ||||||
|  |                 # This is so that we don't notify the user about this | ||||||
|  |                 # message, to work around the fact that some events will | ||||||
|  |                 # reference really really old events we really don't want to | ||||||
|  |                 # send to the clients. | ||||||
|  |                 pdu.internal_metadata.outlier = True | ||||||
|  |             elif min_depth and pdu.depth > min_depth: | ||||||
|  |                 if get_missing and prevs - seen: | ||||||
|  |                     # If we're missing stuff, ensure we only fetch stuff one | ||||||
|  |                     # at a time. | ||||||
|  |                     logger.info( | ||||||
|  |                         "Acquiring lock for room %r to fetch %d missing events: %r...", | ||||||
|  |                         pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], | ||||||
|  |                     ) | ||||||
|  |                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)): | ||||||
|  |                         logger.info( | ||||||
|  |                             "Acquired lock for room %r to fetch %d missing events", | ||||||
|  |                             pdu.room_id, len(prevs - seen), | ||||||
|  |                         ) | ||||||
|  | 
 | ||||||
|  |                         yield self._get_missing_events_for_pdu( | ||||||
|  |                             origin, pdu, prevs, min_depth | ||||||
|  |                         ) | ||||||
|  | 
 | ||||||
|  |             prevs = {e_id for e_id, _ in pdu.prev_events} | ||||||
|  |             seen = set(have_seen.keys()) | ||||||
|  |             if prevs - seen: | ||||||
|  |                 logger.info( | ||||||
|  |                     "Still missing %d events for room %r: %r...", | ||||||
|  |                     len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] | ||||||
|  |                 ) | ||||||
|  |                 fetch_state = True | ||||||
|  | 
 | ||||||
|  |         if fetch_state: | ||||||
|  |             # We need to get the state at this event, since we haven't | ||||||
|  |             # processed all the prev events. | ||||||
|  |             logger.debug( | ||||||
|  |                 "_handle_new_pdu getting state for %s", | ||||||
|  |                 pdu.room_id | ||||||
|  |             ) | ||||||
|  |             try: | ||||||
|  |                 state, auth_chain = yield self.replication_layer.get_state_for_room( | ||||||
|  |                     origin, pdu.room_id, pdu.event_id, | ||||||
|  |                 ) | ||||||
|  |             except: | ||||||
|  |                 logger.exception("Failed to get state for event: %s", pdu.event_id) | ||||||
|  | 
 | ||||||
|  |         yield self._process_received_pdu( | ||||||
|  |             origin, | ||||||
|  |             pdu, | ||||||
|  |             state=state, | ||||||
|  |             auth_chain=auth_chain, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     @defer.inlineCallbacks | ||||||
|  |     def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): | ||||||
|  |         """ | ||||||
|  |         Args: | ||||||
|  |             origin (str): Origin of the pdu. Will be called to get the missing events | ||||||
|  |             pdu: received pdu | ||||||
|  |             prevs (str[]): List of event ids which we are missing | ||||||
|  |             min_depth (int): Minimum depth of events to return. | ||||||
|  | 
 | ||||||
|  |         Returns: | ||||||
|  |             Deferred<dict(str, str?)>: updated have_seen dictionary | ||||||
|  |         """ | ||||||
|  |         # We recalculate seen, since it may have changed. | ||||||
|  |         have_seen = yield self.store.have_events(prevs) | ||||||
|  |         seen = set(have_seen.keys()) | ||||||
|  | 
 | ||||||
|  |         if not prevs - seen: | ||||||
|  |             # nothing left to do | ||||||
|  |             defer.returnValue(have_seen) | ||||||
|  | 
 | ||||||
|  |         latest = yield self.store.get_latest_event_ids_in_room( | ||||||
|  |             pdu.room_id | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # We add the prev events that we have seen to the latest | ||||||
|  |         # list to ensure the remote server doesn't give them to us | ||||||
|  |         latest = set(latest) | ||||||
|  |         latest |= seen | ||||||
|  | 
 | ||||||
|  |         logger.info( | ||||||
|  |             "Missing %d events for room %r: %r...", | ||||||
|  |             len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # XXX: we set timeout to 10s to help workaround | ||||||
|  |         # https://github.com/matrix-org/synapse/issues/1733. | ||||||
|  |         # The reason is to avoid holding the linearizer lock | ||||||
|  |         # whilst processing inbound /send transactions, causing | ||||||
|  |         # FDs to stack up and block other inbound transactions | ||||||
|  |         # which empirically can currently take up to 30 minutes. | ||||||
|  |         # | ||||||
|  |         # N.B. this explicitly disables retry attempts. | ||||||
|  |         # | ||||||
|  |         # N.B. this also increases our chances of falling back to | ||||||
|  |         # fetching fresh state for the room if the missing event | ||||||
|  |         # can't be found, which slightly reduces our security. | ||||||
|  |         # it may also increase our DAG extremity count for the room, | ||||||
|  |         # causing additional state resolution?  See #1760. | ||||||
|  |         # However, fetching state doesn't hold the linearizer lock | ||||||
|  |         # apparently. | ||||||
|  |         # | ||||||
|  |         # see https://github.com/matrix-org/synapse/pull/1744 | ||||||
|  | 
 | ||||||
|  |         missing_events = yield self.replication_layer.get_missing_events( | ||||||
|  |             origin, | ||||||
|  |             pdu.room_id, | ||||||
|  |             earliest_events_ids=list(latest), | ||||||
|  |             latest_events=[pdu], | ||||||
|  |             limit=10, | ||||||
|  |             min_depth=min_depth, | ||||||
|  |             timeout=10000, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # We want to sort these by depth so we process them and | ||||||
|  |         # tell clients about them in order. | ||||||
|  |         missing_events.sort(key=lambda x: x.depth) | ||||||
|  | 
 | ||||||
|  |         for e in missing_events: | ||||||
|  |             yield self.on_receive_pdu( | ||||||
|  |                 origin, | ||||||
|  |                 e, | ||||||
|  |                 get_missing=False | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         have_seen = yield self.store.have_events( | ||||||
|  |             [ev for ev, _ in pdu.prev_events] | ||||||
|  |         ) | ||||||
|  |         defer.returnValue(have_seen) | ||||||
| 
 | 
 | ||||||
|     @log_function |     @log_function | ||||||
|     @defer.inlineCallbacks |     @defer.inlineCallbacks | ||||||
|     def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): |     def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): | ||||||
|         """ Called by the ReplicationLayer when we have a new pdu. We need to |         """ Called when we have a new pdu. We need to do auth checks and put it | ||||||
|         do auth checks and put it through the StateHandler. |         through the StateHandler. | ||||||
| 
 | 
 | ||||||
|         auth_chain and state are None if we already have the necessary state |         auth_chain and state are None if we already have the necessary state | ||||||
|         and prev_events in the db |         and prev_events in the db | ||||||
| @ -738,7 +930,7 @@ class FederationHandler(BaseHandler): | |||||||
|                     continue |                     continue | ||||||
| 
 | 
 | ||||||
|                 try: |                 try: | ||||||
|                     self.on_receive_pdu(origin, p) |                     self._process_received_pdu(origin, p) | ||||||
|                 except: |                 except: | ||||||
|                     logger.exception("Couldn't handle pdu") |                     logger.exception("Couldn't handle pdu") | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user