mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-10-26 13:51:04 +01:00 
			
		
		
		
	Remove fallback from get_missing_events.
get_missing_events used to fallback to fetching the missing events individually requesting from every server in the room, one by one.e This could be unacceptably slow, possibly causing #1732
This commit is contained in:
		
							parent
							
								
									0eac4fa525
								
							
						
					
					
						commit
						cc50b1ae53
					
				| @ -27,7 +27,6 @@ from synapse.util.caches.expiringcache import ExpiringCache | ||||
| from synapse.util.logutils import log_function | ||||
| from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred | ||||
| from synapse.events import FrozenEvent | ||||
| from synapse.types import get_domain_from_id | ||||
| import synapse.metrics | ||||
| 
 | ||||
| from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination | ||||
| @ -741,8 +740,6 @@ class FederationClient(FederationBase): | ||||
|             signed_events = yield self._check_sigs_and_hash_and_fetch( | ||||
|                 destination, events, outlier=False | ||||
|             ) | ||||
| 
 | ||||
|             have_gotten_all_from_destination = True | ||||
|         except HttpResponseException as e: | ||||
|             if not e.code == 400: | ||||
|                 raise | ||||
| @ -750,72 +747,6 @@ class FederationClient(FederationBase): | ||||
|             # We are probably hitting an old server that doesn't support | ||||
|             # get_missing_events | ||||
|             signed_events = [] | ||||
|             have_gotten_all_from_destination = False | ||||
| 
 | ||||
|         if len(signed_events) >= limit: | ||||
|             defer.returnValue(signed_events) | ||||
| 
 | ||||
|         users = yield self.state.get_current_user_in_room(room_id) | ||||
|         servers = set(get_domain_from_id(u) for u in users) | ||||
| 
 | ||||
|         servers = set(servers) | ||||
|         servers.discard(self.server_name) | ||||
| 
 | ||||
|         failed_to_fetch = set() | ||||
| 
 | ||||
|         while len(signed_events) < limit: | ||||
|             # Are we missing any? | ||||
| 
 | ||||
|             seen_events = set(earliest_events_ids) | ||||
|             seen_events.update(e.event_id for e in signed_events if e) | ||||
| 
 | ||||
|             missing_events = {} | ||||
|             for e in itertools.chain(latest_events, signed_events): | ||||
|                 if e.depth > min_depth: | ||||
|                     missing_events.update({ | ||||
|                         e_id: e.depth for e_id, _ in e.prev_events | ||||
|                         if e_id not in seen_events | ||||
|                         and e_id not in failed_to_fetch | ||||
|                     }) | ||||
| 
 | ||||
|             if not missing_events: | ||||
|                 break | ||||
| 
 | ||||
|             have_seen = yield self.store.have_events(missing_events) | ||||
| 
 | ||||
|             for k in have_seen: | ||||
|                 missing_events.pop(k, None) | ||||
| 
 | ||||
|             if not missing_events: | ||||
|                 break | ||||
| 
 | ||||
|             # Okay, we haven't gotten everything yet. Lets get them. | ||||
|             ordered_missing = sorted(missing_events.items(), key=lambda x: x[0]) | ||||
| 
 | ||||
|             if have_gotten_all_from_destination: | ||||
|                 servers.discard(destination) | ||||
| 
 | ||||
|             def random_server_list(): | ||||
|                 srvs = list(servers) | ||||
|                 random.shuffle(srvs) | ||||
|                 return srvs | ||||
| 
 | ||||
|             deferreds = [ | ||||
|                 preserve_fn(self.get_pdu)( | ||||
|                     destinations=random_server_list(), | ||||
|                     event_id=e_id, | ||||
|                 ) | ||||
|                 for e_id, depth in ordered_missing[:limit - len(signed_events)] | ||||
|             ] | ||||
| 
 | ||||
|             res = yield preserve_context_over_deferred( | ||||
|                 defer.DeferredList(deferreds, consumeErrors=True) | ||||
|             ) | ||||
|             for (result, val), (e_id, _) in zip(res, ordered_missing): | ||||
|                 if result and val: | ||||
|                     signed_events.append(val) | ||||
|                 else: | ||||
|                     failed_to_fetch.add(e_id) | ||||
| 
 | ||||
|         defer.returnValue(signed_events) | ||||
| 
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user