hat.syslog.server.ui
Web server implementation
1"""Web server implementation""" 2 3import asyncio 4import contextlib 5import importlib 6import itertools 7import logging 8import urllib 9 10from hat import aio 11from hat import juggler 12 13from hat.syslog.server import common 14from hat.syslog.server import encoder 15import hat.syslog.server.backend 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19"""Module logger""" 20 21max_results_limit: int = 200 22"""Max results limit""" 23 24autoflush_delay: float = 0.2 25"""Juggler autoflush delay""" 26 27default_filter = common.Filter(max_results=max_results_limit) 28"""Default filter""" 29 30 31async def create_web_server(addr: str, 32 backend: hat.syslog.server.backend.Backend 33 ) -> 'WebServer': 34 """Create web server""" 35 srv = WebServer() 36 srv._backend = backend 37 srv._locks = {} 38 srv._filters = {} 39 40 exit_stack = contextlib.ExitStack() 41 try: 42 ui_path = exit_stack.enter_context( 43 importlib.resources.path(__package__, 'ui')) 44 45 url = urllib.parse.urlparse(addr) 46 srv._srv = await juggler.listen(host=url.hostname, 47 port=url.port, 48 connection_cb=srv._on_connection, 49 request_cb=srv._on_request, 50 static_dir=ui_path, 51 autoflush_delay=autoflush_delay) 52 53 try: 54 srv.async_group.spawn(aio.call_on_cancel, exit_stack.close) 55 56 except BaseException: 57 await aio.uncancellable(srv.async_close()) 58 raise 59 60 except BaseException: 61 exit_stack.close() 62 raise 63 64 mlog.debug("web server listening on %s", addr) 65 return srv 66 67 68class WebServer(aio.Resource): 69 70 @property 71 def async_group(self) -> aio.Group: 72 """Async group""" 73 return self._srv.async_group 74 75 async def _on_connection(self, conn): 76 try: 77 mlog.debug("new connection") 78 79 self._locks[conn] = asyncio.Lock() 80 self._filters[conn] = default_filter 81 82 change_queue = aio.Queue() 83 with self._backend.register_change_cb(change_queue.put_nowait): 84 async with self._locks[conn]: 85 prev_filter = self._filters[conn] 86 prev_filter_json = encoder.filter_to_json(prev_filter) 87 88 entries = await self._backend.query(prev_filter) 89 entries_json = [encoder.entry_to_json(entry) 90 for entry in entries] 91 92 conn.state.set([], {'filter': prev_filter_json, 93 'entries': entries_json, 94 'first_id': self._backend.first_id, 95 'last_id': self._backend.last_id}) 96 97 while True: 98 entries = await change_queue.get() 99 100 async with self._locks[conn]: 101 prev_filter = self._filters[conn] 102 prev_filter_json = conn.state.get('filter') 103 prev_entries_json = conn.state.get('entries') 104 105 previous_id = (prev_entries_json[0]['id'] 106 if prev_entries_json else 0) 107 entries = (entry for entry in entries 108 if entry.id > previous_id) 109 entries = _filter_entries(prev_filter, entries) 110 entries_json = [encoder.entry_to_json(entry) 111 for entry in entries] 112 113 if entries_json: 114 new_entries_json = itertools.chain( 115 entries_json, prev_entries_json) 116 new_entries_json = itertools.islice( 117 new_entries_json, prev_filter.max_results) 118 new_entries_json = list(new_entries_json) 119 120 else: 121 new_entries_json = prev_entries_json 122 123 conn.state.set([], {'filter': prev_filter_json, 124 'entries': new_entries_json, 125 'first_id': self._backend.first_id, 126 'last_id': self._backend.last_id}) 127 128 except Exception as e: 129 mlog.error("connection error: %s", e, exc_info=e) 130 131 finally: 132 mlog.debug("closing connection") 133 conn.close() 134 self._locks.pop(conn) 135 self._filters.pop(conn) 136 137 async def _on_request(self, conn, name, data): 138 if name != 'filter': 139 raise Exception('invalid request name') 140 141 new_filter = encoder.filter_from_json(data) 142 new_filter = _sanitize_filter(new_filter) 143 144 async with self._locks[conn]: 145 prev_filter = self._filters[conn] 146 if new_filter == prev_filter: 147 return 148 149 mlog.debug('setting new filter: %s', new_filter) 150 new_filter_json = encoder.filter_to_json(new_filter) 151 152 entries = await self._backend.query(new_filter) 153 entries_json = [encoder.entry_to_json(entry) for entry in entries] 154 155 self._filters[conn] = new_filter 156 conn.state.set([], {'filter': new_filter_json, 157 'entries': entries_json, 158 'first_id': self._backend.first_id, 159 'last_id': self._backend.last_id}) 160 161 162def _sanitize_filter(f): 163 if f.max_results is None or f.max_results > max_results_limit: 164 f = f._replace(max_results=max_results_limit) 165 166 return f 167 168 169def _filter_entries(f, entries): 170 for i in entries: 171 if f.last_id is not None and i.id > f.last_id: 172 continue 173 174 if (f.entry_timestamp_from is not None 175 and i.timestamp < f.entry_timestamp_from): 176 continue 177 178 if (f.entry_timestamp_to is not None 179 and i.timestamp > f.entry_timestamp_to): 180 continue 181 182 if f.facility is not None and i.msg.facility != f.facility: 183 continue 184 185 if f.severity is not None and i.msg.severity != f.severity: 186 continue 187 188 if not _match_str_filter(f.hostname, i.msg.hostname): 189 continue 190 191 if not _match_str_filter(f.app_name, i.msg.app_name): 192 continue 193 194 if not _match_str_filter(f.procid, i.msg.procid): 195 continue 196 197 if not _match_str_filter(f.msgid, i.msg.msgid): 198 continue 199 200 if not _match_str_filter(f.msg, i.msg.msg): 201 continue 202 203 yield i 204 205 206def _match_str_filter(f, value): 207 return not f or f in value
Module logger
max_results_limit: int =
200
Max results limit
autoflush_delay: float =
0.2
Juggler autoflush delay
default_filter =
Filter(max_results=200, last_id=None, entry_timestamp_from=None, entry_timestamp_to=None, facility=None, severity=None, hostname=None, app_name=None, procid=None, msgid=None, msg=None)
Default filter
async def
create_web_server( addr: str, backend: hat.syslog.server.backend.Backend) -> hat.syslog.server.ui.WebServer:
32async def create_web_server(addr: str, 33 backend: hat.syslog.server.backend.Backend 34 ) -> 'WebServer': 35 """Create web server""" 36 srv = WebServer() 37 srv._backend = backend 38 srv._locks = {} 39 srv._filters = {} 40 41 exit_stack = contextlib.ExitStack() 42 try: 43 ui_path = exit_stack.enter_context( 44 importlib.resources.path(__package__, 'ui')) 45 46 url = urllib.parse.urlparse(addr) 47 srv._srv = await juggler.listen(host=url.hostname, 48 port=url.port, 49 connection_cb=srv._on_connection, 50 request_cb=srv._on_request, 51 static_dir=ui_path, 52 autoflush_delay=autoflush_delay) 53 54 try: 55 srv.async_group.spawn(aio.call_on_cancel, exit_stack.close) 56 57 except BaseException: 58 await aio.uncancellable(srv.async_close()) 59 raise 60 61 except BaseException: 62 exit_stack.close() 63 raise 64 65 mlog.debug("web server listening on %s", addr) 66 return srv
Create web server
class
WebServer(hat.aio.Resource):
69class WebServer(aio.Resource): 70 71 @property 72 def async_group(self) -> aio.Group: 73 """Async group""" 74 return self._srv.async_group 75 76 async def _on_connection(self, conn): 77 try: 78 mlog.debug("new connection") 79 80 self._locks[conn] = asyncio.Lock() 81 self._filters[conn] = default_filter 82 83 change_queue = aio.Queue() 84 with self._backend.register_change_cb(change_queue.put_nowait): 85 async with self._locks[conn]: 86 prev_filter = self._filters[conn] 87 prev_filter_json = encoder.filter_to_json(prev_filter) 88 89 entries = await self._backend.query(prev_filter) 90 entries_json = [encoder.entry_to_json(entry) 91 for entry in entries] 92 93 conn.state.set([], {'filter': prev_filter_json, 94 'entries': entries_json, 95 'first_id': self._backend.first_id, 96 'last_id': self._backend.last_id}) 97 98 while True: 99 entries = await change_queue.get() 100 101 async with self._locks[conn]: 102 prev_filter = self._filters[conn] 103 prev_filter_json = conn.state.get('filter') 104 prev_entries_json = conn.state.get('entries') 105 106 previous_id = (prev_entries_json[0]['id'] 107 if prev_entries_json else 0) 108 entries = (entry for entry in entries 109 if entry.id > previous_id) 110 entries = _filter_entries(prev_filter, entries) 111 entries_json = [encoder.entry_to_json(entry) 112 for entry in entries] 113 114 if entries_json: 115 new_entries_json = itertools.chain( 116 entries_json, prev_entries_json) 117 new_entries_json = itertools.islice( 118 new_entries_json, prev_filter.max_results) 119 new_entries_json = list(new_entries_json) 120 121 else: 122 new_entries_json = prev_entries_json 123 124 conn.state.set([], {'filter': prev_filter_json, 125 'entries': new_entries_json, 126 'first_id': self._backend.first_id, 127 'last_id': self._backend.last_id}) 128 129 except Exception as e: 130 mlog.error("connection error: %s", e, exc_info=e) 131 132 finally: 133 mlog.debug("closing connection") 134 conn.close() 135 self._locks.pop(conn) 136 self._filters.pop(conn) 137 138 async def _on_request(self, conn, name, data): 139 if name != 'filter': 140 raise Exception('invalid request name') 141 142 new_filter = encoder.filter_from_json(data) 143 new_filter = _sanitize_filter(new_filter) 144 145 async with self._locks[conn]: 146 prev_filter = self._filters[conn] 147 if new_filter == prev_filter: 148 return 149 150 mlog.debug('setting new filter: %s', new_filter) 151 new_filter_json = encoder.filter_to_json(new_filter) 152 153 entries = await self._backend.query(new_filter) 154 entries_json = [encoder.entry_to_json(entry) for entry in entries] 155 156 self._filters[conn] = new_filter 157 conn.state.set([], {'filter': new_filter_json, 158 'entries': entries_json, 159 'first_id': self._backend.first_id, 160 'last_id': self._backend.last_id})
Resource with lifetime control based on Group
.
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close