mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-10-31 16:21:56 +01:00 
			
		
		
		
	Merge pull request #1864 from matrix-org/erikj/device_list_fixes
Fix clearing out old device list outbound pokes
This commit is contained in:
		
						commit
						ac001dabdc
					
				| @ -436,15 +436,27 @@ class DeviceStore(SQLBaseStore): | |||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): |     def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): | ||||||
|  |         # First we DELETE all rows such that only the latest row for each | ||||||
|  |         # (destination, user_id is left. We do this by selecting first and | ||||||
|  |         # deleting. | ||||||
|  |         sql = """ | ||||||
|  |             SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes | ||||||
|  |             WHERE destination = ? AND stream_id <= ? | ||||||
|  |             GROUP BY user_id | ||||||
|  |             HAVING count(*) > 1 | ||||||
|  |         """ | ||||||
|  |         txn.execute(sql, (destination, stream_id,)) | ||||||
|  |         rows = txn.fetchall() | ||||||
|  | 
 | ||||||
|         sql = """ |         sql = """ | ||||||
|             DELETE FROM device_lists_outbound_pokes |             DELETE FROM device_lists_outbound_pokes | ||||||
|             WHERE destination = ? AND stream_id < ( |             WHERE destination = ? AND user_id = ? AND stream_id < ? | ||||||
|                 SELECT coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes |  | ||||||
|                 WHERE destination = ? AND stream_id <= ? |  | ||||||
|             ) |  | ||||||
|         """ |         """ | ||||||
|         txn.execute(sql, (destination, destination, stream_id,)) |         txn.executemany( | ||||||
|  |             sql, ((destination, row[0], row[1],) for row in rows) | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|  |         # Mark everything that is left as sent | ||||||
|         sql = """ |         sql = """ | ||||||
|             UPDATE device_lists_outbound_pokes SET sent = ? |             UPDATE device_lists_outbound_pokes SET sent = ? | ||||||
|             WHERE destination = ? AND stream_id <= ? |             WHERE destination = ? AND stream_id <= ? | ||||||
| @ -545,18 +557,22 @@ class DeviceStore(SQLBaseStore): | |||||||
|         (destination, user_id) tuple to ensure that the prev_ids remain correct |         (destination, user_id) tuple to ensure that the prev_ids remain correct | ||||||
|         if the server does come back. |         if the server does come back. | ||||||
|         """ |         """ | ||||||
|         now = self._clock.time_msec() |         yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 | ||||||
| 
 | 
 | ||||||
|         def _prune_txn(txn): |         def _prune_txn(txn): | ||||||
|             select_sql = """ |             select_sql = """ | ||||||
|                 SELECT destination, user_id, max(stream_id) as stream_id |                 SELECT destination, user_id, max(stream_id) as stream_id | ||||||
|                 FROM device_lists_outbound_pokes |                 FROM device_lists_outbound_pokes | ||||||
|                 GROUP BY destination, user_id |                 GROUP BY destination, user_id | ||||||
|  |                 HAVING min(ts) < ? AND count(*) > 1 | ||||||
|             """ |             """ | ||||||
| 
 | 
 | ||||||
|             txn.execute(select_sql) |             txn.execute(select_sql, (yesterday,)) | ||||||
|             rows = txn.fetchall() |             rows = txn.fetchall() | ||||||
| 
 | 
 | ||||||
|  |             if not rows: | ||||||
|  |                 return | ||||||
|  | 
 | ||||||
|             delete_sql = """ |             delete_sql = """ | ||||||
|                 DELETE FROM device_lists_outbound_pokes |                 DELETE FROM device_lists_outbound_pokes | ||||||
|                 WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? |                 WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? | ||||||
| @ -565,11 +581,13 @@ class DeviceStore(SQLBaseStore): | |||||||
|             txn.executemany( |             txn.executemany( | ||||||
|                 delete_sql, |                 delete_sql, | ||||||
|                 ( |                 ( | ||||||
|                     (now, row["destination"], row["user_id"], row["stream_id"]) |                     (yesterday, row[0], row[1], row[2]) | ||||||
|                     for row in rows |                     for row in rows | ||||||
|                 ) |                 ) | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  |             logger.info("Pruned %d device list outbound pokes", txn.rowcount) | ||||||
|  | 
 | ||||||
|         return self.runInteraction( |         return self.runInteraction( | ||||||
|             "_prune_old_outbound_device_pokes", _prune_txn |             "_prune_old_outbound_device_pokes", _prune_txn | ||||||
|         ) |         ) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user