mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-10-31 00:01:33 +01:00 
			
		
		
		
	Fix device list update to not constantly resync
This commit is contained in:
		
							parent
							
								
									e933a2712d
								
							
						
					
					
						commit
						36be39b8b3
					
				| @ -34,10 +34,11 @@ class DeviceHandler(BaseHandler): | |||||||
|         self.state = hs.get_state_handler() |         self.state = hs.get_state_handler() | ||||||
|         self.federation_sender = hs.get_federation_sender() |         self.federation_sender = hs.get_federation_sender() | ||||||
|         self.federation = hs.get_replication_layer() |         self.federation = hs.get_replication_layer() | ||||||
|         self._remote_edue_linearizer = Linearizer(name="remote_device_list") | 
 | ||||||
|  |         self._edu_updater = DeviceListEduUpdater(hs, self) | ||||||
| 
 | 
 | ||||||
|         self.federation.register_edu_handler( |         self.federation.register_edu_handler( | ||||||
|             "m.device_list_update", self._incoming_device_list_update, |             "m.device_list_update", self._edu_updater.incoming_device_list_update, | ||||||
|         ) |         ) | ||||||
|         self.federation.register_query_handler( |         self.federation.register_query_handler( | ||||||
|             "user_devices", self.on_federation_query_user_devices, |             "user_devices", self.on_federation_query_user_devices, | ||||||
| @ -299,58 +300,6 @@ class DeviceHandler(BaseHandler): | |||||||
|         # and those that actually still share a room with the user |         # and those that actually still share a room with the user | ||||||
|         defer.returnValue(users_who_share_room & possibly_changed) |         defer.returnValue(users_who_share_room & possibly_changed) | ||||||
| 
 | 
 | ||||||
|     @measure_func("_incoming_device_list_update") |  | ||||||
|     @defer.inlineCallbacks |  | ||||||
|     def _incoming_device_list_update(self, origin, edu_content): |  | ||||||
|         user_id = edu_content["user_id"] |  | ||||||
|         device_id = edu_content["device_id"] |  | ||||||
|         stream_id = edu_content["stream_id"] |  | ||||||
|         prev_ids = edu_content.get("prev_id", []) |  | ||||||
| 
 |  | ||||||
|         if get_domain_from_id(user_id) != origin: |  | ||||||
|             # TODO: Raise? |  | ||||||
|             logger.warning("Got device list update edu for %r from %r", user_id, origin) |  | ||||||
|             return |  | ||||||
| 
 |  | ||||||
|         rooms = yield self.store.get_rooms_for_user(user_id) |  | ||||||
|         if not rooms: |  | ||||||
|             # We don't share any rooms with this user. Ignore update, as we |  | ||||||
|             # probably won't get any further updates. |  | ||||||
|             return |  | ||||||
| 
 |  | ||||||
|         with (yield self._remote_edue_linearizer.queue(user_id)): |  | ||||||
|             # If the prev id matches whats in our cache table, then we don't need |  | ||||||
|             # to resync the users device list, otherwise we do. |  | ||||||
|             resync = True |  | ||||||
|             if len(prev_ids) == 1: |  | ||||||
|                 extremity = yield self.store.get_device_list_last_stream_id_for_remote( |  | ||||||
|                     user_id |  | ||||||
|                 ) |  | ||||||
|                 logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) |  | ||||||
|                 if str(extremity) == str(prev_ids[0]): |  | ||||||
|                     resync = False |  | ||||||
| 
 |  | ||||||
|             if resync: |  | ||||||
|                 # Fetch all devices for the user. |  | ||||||
|                 result = yield self.federation.query_user_devices(origin, user_id) |  | ||||||
|                 stream_id = result["stream_id"] |  | ||||||
|                 devices = result["devices"] |  | ||||||
|                 yield self.store.update_remote_device_list_cache( |  | ||||||
|                     user_id, devices, stream_id, |  | ||||||
|                 ) |  | ||||||
|                 device_ids = [device["device_id"] for device in devices] |  | ||||||
|                 yield self.notify_device_update(user_id, device_ids) |  | ||||||
|             else: |  | ||||||
|                 # Simply update the single device, since we know that is the only |  | ||||||
|                 # change (becuase of the single prev_id matching the current cache) |  | ||||||
|                 content = dict(edu_content) |  | ||||||
|                 for key in ("user_id", "device_id", "stream_id", "prev_ids"): |  | ||||||
|                     content.pop(key, None) |  | ||||||
|                 yield self.store.update_remote_device_list_cache_entry( |  | ||||||
|                     user_id, device_id, content, stream_id, |  | ||||||
|                 ) |  | ||||||
|                 yield self.notify_device_update(user_id, [device_id]) |  | ||||||
| 
 |  | ||||||
|     @defer.inlineCallbacks |     @defer.inlineCallbacks | ||||||
|     def on_federation_query_user_devices(self, user_id): |     def on_federation_query_user_devices(self, user_id): | ||||||
|         stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) |         stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) | ||||||
| @ -376,3 +325,123 @@ def _update_device_from_client_ips(device, client_ips): | |||||||
|         "last_seen_ts": ip.get("last_seen"), |         "last_seen_ts": ip.get("last_seen"), | ||||||
|         "last_seen_ip": ip.get("ip"), |         "last_seen_ip": ip.get("ip"), | ||||||
|     }) |     }) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class DeviceListEduUpdater(object): | ||||||
|  |     "Handles incoming device list updates from federation and updates the DB" | ||||||
|  | 
 | ||||||
|  |     def __init__(self, hs, device_handler): | ||||||
|  |         self.store = hs.get_datastore() | ||||||
|  |         self.federation = hs.get_replication_layer() | ||||||
|  |         self.clock = hs.get_clock() | ||||||
|  |         self.device_handler = device_handler | ||||||
|  | 
 | ||||||
|  |         self._remote_edue_linearizer = Linearizer(name="remote_device_list") | ||||||
|  | 
 | ||||||
|  |         # user_id -> list of updates waiting to be handled. | ||||||
|  |         self._pending_updates = {} | ||||||
|  | 
 | ||||||
|  |         # Recently seen stream ids. We don't bother keeping these in the DB, | ||||||
|  |         # but they're useful to have them about to reduce the number of spurious | ||||||
|  |         # resyncs. | ||||||
|  |         self._seen_updates = {} | ||||||
|  | 
 | ||||||
|  |     @defer.inlineCallbacks | ||||||
|  |     def incoming_device_list_update(self, origin, edu_content): | ||||||
|  |         """Called on incoming device list update from federation. Responsible | ||||||
|  |         for parsing the EDU and adding to pending updates list. | ||||||
|  |         """ | ||||||
|  | 
 | ||||||
|  |         user_id = edu_content.pop("user_id") | ||||||
|  |         device_id = edu_content.pop("device_id") | ||||||
|  |         stream_id = str(edu_content.pop("stream_id"))  # They may come as ints | ||||||
|  |         prev_ids = edu_content.pop("prev_id", []) | ||||||
|  |         prev_ids = [str(p) for p in prev_ids]   # They may come as ints | ||||||
|  | 
 | ||||||
|  |         if get_domain_from_id(user_id) != origin: | ||||||
|  |             # TODO: Raise? | ||||||
|  |             logger.warning("Got device list update edu for %r from %r", user_id, origin) | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|  |         rooms = yield self.store.get_rooms_for_user(user_id) | ||||||
|  |         if not rooms: | ||||||
|  |             # We don't share any rooms with this user. Ignore update, as we | ||||||
|  |             # probably won't get any further updates. | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|  |         self._pending_updates.setdefault(user_id, []).append( | ||||||
|  |             (device_id, stream_id, prev_ids, edu_content) | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         yield self._handle_device_updates(user_id) | ||||||
|  | 
 | ||||||
|  |     @measure_func("_incoming_device_list_update") | ||||||
|  |     @defer.inlineCallbacks | ||||||
|  |     def _handle_device_updates(self, user_id): | ||||||
|  |         "Actually handle pending updates." | ||||||
|  | 
 | ||||||
|  |         with (yield self._remote_edue_linearizer.queue(user_id)): | ||||||
|  |             pending_updates = self._pending_updates.pop(user_id, []) | ||||||
|  |             if not pending_updates: | ||||||
|  |                 # This can happen since we batch updates | ||||||
|  |                 return | ||||||
|  | 
 | ||||||
|  |             resync = yield self._need_to_do_resync(user_id, pending_updates) | ||||||
|  | 
 | ||||||
|  |             if resync: | ||||||
|  |                 # Fetch all devices for the user. | ||||||
|  |                 origin = get_domain_from_id(user_id) | ||||||
|  |                 result = yield self.federation.query_user_devices(origin, user_id) | ||||||
|  |                 stream_id = result["stream_id"] | ||||||
|  |                 devices = result["devices"] | ||||||
|  |                 yield self.store.update_remote_device_list_cache( | ||||||
|  |                     user_id, devices, stream_id, | ||||||
|  |                 ) | ||||||
|  |                 device_ids = [device["device_id"] for device in devices] | ||||||
|  |                 yield self.device_handler.notify_device_update(user_id, device_ids) | ||||||
|  |             else: | ||||||
|  |                 # Simply update the single device, since we know that is the only | ||||||
|  |                 # change (becuase of the single prev_id matching the current cache) | ||||||
|  |                 for device_id, stream_id, prev_ids, content in pending_updates: | ||||||
|  |                     yield self.store.update_remote_device_list_cache_entry( | ||||||
|  |                         user_id, device_id, content, stream_id, | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                 yield self.device_handler.notify_device_update( | ||||||
|  |                     user_id, [device_id for device_id, _, _, _ in pending_updates] | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |             self._seen_updates.setdefault(user_id, set()).update( | ||||||
|  |                 [stream_id for _, stream_id, _, _ in pending_updates] | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |     @defer.inlineCallbacks | ||||||
|  |     def _need_to_do_resync(self, user_id, updates): | ||||||
|  |         """Given a list of updates for a user figure out if we need to do a full | ||||||
|  |         resync, or whether we have enough data that we can just apply the delta. | ||||||
|  |         """ | ||||||
|  |         seen_updates = self._seen_updates.get(user_id, set()) | ||||||
|  | 
 | ||||||
|  |         extremity = yield self.store.get_device_list_last_stream_id_for_remote( | ||||||
|  |             user_id | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         stream_id_in_updates = set()  # stream_ids in updates list | ||||||
|  |         for _, stream_id, prev_ids, _ in updates: | ||||||
|  |             if not prev_ids: | ||||||
|  |                 # We always do a resync if there are no previous IDs | ||||||
|  |                 defer.returnValue(True) | ||||||
|  | 
 | ||||||
|  |             for prev_id in prev_ids: | ||||||
|  |                 if prev_id == extremity: | ||||||
|  |                     continue | ||||||
|  |                 elif prev_id in seen_updates: | ||||||
|  |                     continue | ||||||
|  |                 elif prev_id in stream_id_in_updates: | ||||||
|  |                     continue | ||||||
|  |                 else: | ||||||
|  |                     defer.returnValue(True) | ||||||
|  | 
 | ||||||
|  |             stream_id_in_updates.add(stream_id) | ||||||
|  | 
 | ||||||
|  |         defer.returnValue(False) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user