People use itLots of University students' projects tend to be python, so non-industrial software abounds.
eg https://pypi.org/project/attractors apparently makes these babies:
Discovering the purpose of that asap. A topology for posing philo on?
trades brackets for other brackets
laid back if etc syntax and tense property access
ind = ' '
if out["std"] == 'err':
ind = '!! '
out = {} above, is a dictionary. The out.std syntax is only for object methods, not dictionary entries.
so all objectdictionary property access gets quite ["brackety"].
also if a key doesn't exist, it is fatal, so you write stuff like:
# should happen over there too
if 'ssh_around' in job:
cmd = job['ssh_around']+' '+json.dumps(cmd)
boilerplate
#!/usr/bin/env python3
import os
import time
import concurrent.futures
import threading
import subprocess
import sys
import re
import json
from pathlib import Path
import zap_parser
import zap_ui
import pprint
def dd(data,depth=7):
pp = pprint.PrettyPrinter(depth=depth)
pp.pprint(data)
dd() is leftover from development, when you need to visually verify datastructures.
threading sends a function off on its own timeline, as if it were a separate python program, sharing memory.
threading.Thread(target=all_systems_go).start()
...
threading.Thread(target=check_jobs).start()
...
zap_ui.begin(i_job,job_i,systems)
this allows the job runner bunch-of-events to not be blocked by the UI thread doing this:
# Event loop
while True:
key = stdscr.getch()
...
time.sleep(0.1)for non-blocking stdscr.getch(), the loop must sleep.
There's also concurrent.futures, apparently the way a list of jobs want to run...
def all_systems_go():
# < figure out if any of this can be less terrifying
# max_workers so that all jobs can stay happening
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit each command to the executor
future_results = []
for system in systems:
jobs = system['jobs']
for job in jobs:
future_results.append(executor.submit(run_job, job))
# Process the results as they become available
for future in concurrent.futures.as_completed(future_results):
result = future.result()
It can have max_workers. Apparently this is better than using more threads, though one reason "graceful shutdown" doesn't seem to work:
Why? podman rm -f often required after podman run
It seems not to do graceful shutdown, so lets gracefully automate sweeping the crashsite:
def fixup_for_podmanrun_job(job,out):
line = out['s']
if out["std"] == "err":
if m := re.search(r'the container name "(\S+)" is already in use', line):
run_fixup(job,'podman rm -f {}'.format(m.group(1)))
Automatic error message reader and command typer!
Being able to innovate on this last mile of computer system is great.
If everyone screencast their entire computer experience, we could probably map the jagged bits of cave, and deliver neat flicks to the blubber that near thrust upon them.
ansii colour codes in curses terminals
ansii colours
Are just there in terminal program output sometimes, pipe them to xxd (or less) to investigate.
Or you can code them two ways:
from colorama import Fore, Back, Style
print(Fore.
RED + 'some red text')
print("\x1b\x5b32mSweet\x1b\x5b37m etc")
curses
Putting strings on thye screen looks like this:
stdscr.addstr(i, 0, "["+str(job["i"])+"] "+dotdotdotat(job["t"], cols - 23))
This doesn't like ansii colours.
Curses also has a 80s style colour api. This library would almost do but only does 5 terminal colours, presence of others are fatal.
less
So we shell out to less to look at the job output:
less_process = subprocess.Popen(["less", "-R", "+F", tmp.name])
...
stdscr = curses.initscr()
less -R presents ansii colours perfectly, and +F means follow|tail.
You Ctrl+C less to get back to the UI, provided it handled the SIGINT, which also always takes out lsyncd (but none of the other jobs) (we auto restart it!)
Evaluation of Default Arguments
Python arguments are evaluated when the function definition is encountered. This means eg memo is shared by all calls.
def fib_memo(n, memo={0:0, 1:1}):
"""
n is the number nth number
you would like to return in the sequence
"""
if not n in memo:
memo[n] = fib_memo(n-1) + fib_memo(n-2)
return memo[n]
fib_memo(6)
assign in expr
if(result := some_method()):
print(result)
commutes
a, b = b, a
nonlocal
The way hence, belonging to the freq() lexical scope, drifts into when().
# for a loop full of if branches going off at different intervals
def freq(hz):
hence = 0
period = 1/hz
def when():
nonlocal hence
if time.time() - hence > period:
hence = time.time()
return 1
return when
Without nonlocal python first assumes these are two unrelated hence variables, then in this example complains that the inner one supposedly existing before any assignment in when():
UnboundLocalError: local variable 'hence' referenced before assignment
But it might be subtler for you! I can't remember how but something else can go wrong too. Soz.
how zap runs jobs
def run_job(job,actual_cmd=None,sleepytime=None):
i = job["i"]
command = actual_cmd or job["command"]
def diag(s):
#print(s)
1
if sleepytime:
# job.restart should not burn cpu
time.sleep(2)
if actual_cmd:
diag(f"[{i}] other: "+ command)
else:
diag(f"[{i}] starts: "+ job["t"])
# the ui shall tail this
# [{std:'out',s:'hello\n',time:...}+]
if not 'output' in job:
job["output"] = []
# downstreams to have in this thread eg fixup
job["listen_out"] = []
# attach GOFAI fixup actuators
# you can fixup fixups too so we pass command
give_job_fixup(job,command)
# json strings are shell-compatible
# in perl this will be $ARGV[0], not needing decode
zap_command = zap_run_path+" "+json.dumps(command)
process = job["process"] = subprocess.Popen(zap_command, shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0)
def readaline(ch,std):
linesing = iter(std.readline, "")
for line in linesing:
# < do we want the ^ + and \n$
# seem to do \n before turning off TerminalColors, just remove it
line = re.sub("\n","",line)
out = iout(job,ch,line.strip())
diag(f"[{i}] std{ch}: "+out["s"])
# downstreams to have in this thread eg fixup
for cb in job["listen_out"]:
cb(job,out)
readaline('out',process.stdout)
readaline('err',process.stderr)
# stdin
def inN(N):
if not "wrote_stdin" in job:
job["wrote_stdin"] = 0
for l in N:
job["wrote_stdin"] = job["wrote_stdin"] + 1
process.stdin.write(l + "\n")
print("Wrote to "+job["t"]+": '"+l+"'")
# flush reduces latency and the need to \n$, but the receiver might be waiting for one?
process.stdin.flush()
job["give_stdin"] = inN
# < multiple sudos in a command should work
# password fed only the first time
if 0 and "sudostdin" and 'sudo ' in command:
host = job["on_host"]
if not host:
raise ValueError("only know vms")
file_path = Path("secrets/sudo-on-"+host)
password = file_path.read_text()
if not password:
raise ValueError("dont know "+file_path)
#
time.sleep(0.4)
inN([password])
job["exit_code"] = None
def check1s():
exit_code = process.poll()
if exit_code is not None:
diag(f"[{i}] trouble! code:"+str(exit_code))
job["exit_code"] = exit_code
diag(f"[{i}] finito")
job["check1s"] = lambda: 1
# when the job produces an error code?
if 'restart' in job:
iout(job,'fix'," ↺ job restart")
# we are the check_jobs thread currently
# Create a new thread and call run_job(job) within that thread
threading.Thread(target=run_job, args=(job,None,'sleepy')).start()
job["check1s"] = check1s
So fixups are observed through for cb in job["listen_out"] and when sprung call run_job(), reusing the job object (and its output log which tells of all this) do some other actual_cmd.
For some reason job["exit_code"] gets stuck once set... The job restarting doesn't really return it to None. Perhaps data is getting weird between threads.
The Wakecycle of a Job
Initially parse_cmd_source(L) parses this set of system/job/cmd:
# style_dev
cd ~/stylehouse
./serve.pl
# letz_dev
lsyncd py/letz.lsyncdconf
echo yep
%restart
# this seems to get a SIGINT from the UI doing less
# fast code deployment over ssh
# inotify on s: -> replication -> inotify on gox (-> vite etc)
# unlike sshfs ~~ ftp needing ongoing ls
ssh -A gox
sshfs s:/media/s/Elvis/Photo v
# s is 192.168.122.1, virbr0 on sa
# v is the mount at gox:~s/v, goes into:
ssh gox
cd src/letz
podman run -v ~/v:/v:ro -v .:/app:exec -p 5000:5000 --rm -it --name pyt py bash -c './yt.sh'
Then a bit of def create_job_title(job,cmds):
match = re.search(r'^ssh .*?(\w+)$', cmd)
if match:
job["on_host"] = match.group(1)
titles.append(match.group(1)+':')
continue
match = re.search(r'^podman run .* --name (\S+) (\S+) ?((\w+ ?)+)?', cmd)
if match:
titles.append(match.group(2)+'->'+match.group(1))
if match.group(3):
titles.append(match.group(3).strip())
continue
The job/cmd hierarchy joins bits of title for job from the many cmds.
That then get the following job titles:
- serve.pl
- py/letz.lsyncdconf echo yep
- sshfs s:...Photo v
- py->pyt bash
Yay! That sshfs should be gox: sshfs s:...Photo v but we'll call that good.
more threads
from the UI thread
You might spin up a job thread off the ui thread from the keypress dispatcher:
elif key == ord('R') or key == ord('r'):
job = i_job[selected_row]
threading.Thread(target=restart_job, args=[job]).start()
stdscr.addstr(rows-2, 2, "job restarting")
If you forgot: args=[job]
And forgot: args=(job,)
And just put: args=(job)
It becomes: args=job
ie job.* become too many arguments:
TypeError: thred() takes 1 positional argument but 5 were given
Pretty silly, the list-not-array design here in the args department...
Standardise on args=[...]
to a new job thread
This is imported to zap_ui.py from zap_job.py:
def restart_job(job):
# stop what we were doing
job['process'].terminate()
# backflip logo in job label
remark_job_ui(job,"↺")
# and marked in job output
iout(job,'fix'," ↺ manual restart")
run_job(job)
The backflip logo must be tidied away after a while
# 3s remark drawn in draw_job_label()
def remark_job_ui(job,say):
def later(say):
time.sleep(2)
# WRONG:
if job["notice"] == say:
del job["notice"]
job["notice"] = say
threading.Thread(target=later, args=[say]).start()
Then, DRAMA! When holding down the 'R' key:
if job["notice"] == say:
KeyError: 'notice'
If now + and later -; this goes ++++----, yet only the first - has any job["notice"] to delete.
Which brings us neatly back to the beginning of this write, on property access:
def later(say):
time.sleep(phi)
if "notice" in job and job["notice"] == say:
del job["notice"]
Ta.
No comments:
Post a Comment