How to read a line from python currently in console? -
i want use line of code can read typing in console, use asyncio
module in python. code prints data when receives server, , after that, want read typing , save variable. fine having use non-standard module.
information program:
- currently reads stdin non-blocking using asyncio loop.run_in_executor
- program runs using loop.create_connection, uses loop.run_forever stdin reader added task.
code:
#!python3.5 #chatroom client import socket, sys, os, traceback, asyncio threading import lock dbg = false #more error printing using full_error function #async stuf loop = asyncio.get_event_loop() lock = lock() prev_stdin = "" #server location target = 'localhost' port = 17532 buffer_size = 1024 server = none transports = [] #an array of async.iotransport`s #functions error handling full_error = lambda: traceback.print_exception(*sys.exc_info()) pause = lambda: none #make global below #pause system compatability if os.name[0:5]=='posix': def pause(): os.system('read -n1 -r -p "press key continue . . ." key') elif os.name[0:2]=='nt': def pause(): os.system("pause") else: def pause(): input("press enter continue . . .") def empty_stdin(): output = "" while true: temp = sys.stdin.buffer.read(1) if temp == "": break else: output += temp return output if dbg: loop.set_debug(dbg) async def write(): loop = asyncio.get_event_loop() while true: line = await loop.run_in_executor(none, sys.stdin.readline) line = prev_stdin + line prev_stdin = "" in transports: loop.call_soon(i.write,line.encode('utf-8')) class server(asyncio.protocol): def connection_made(self,transport): self.transport = transport transport.write(b"connected") def data_received(self, data): print("\n%s"%data.decode('utf-8')) prev_stdin = empty_stdin() print(prev_stdin,) try: coro = loop.create_connection(server, host=target, port=port) task = loop.create_task(coro) print("hi") print(transports) server, serverp = loop.run_until_complete(task) transports += [server] user_input = loop.create_task(write()) #loop.add_reader(sys.stdin, write) loop.run_forever() print('end') except exception e: if dbg: full_error() else: print(type(e)) print(e) pause() sys.exit()
i solved problem (windows only) using msvcrt
catch single character , code changes.
new functions/changes
draw_screen:
- clear screen using
os.system("cls")
- print lines save when print info (could laggy)
- print cursor typing (">> hello")
- clear screen using
write:
- replaced sys.stdin.readline msvcrt.getch
- a lot more changes
everywhere:
- whenever print, instead add
screen
variable array holds everything. calls draw_screen() - removed prev_stdin stuff
- removed other print statements
- whenever print, instead add
final code (for now):
#!python3.5 #chatroom client import sys, os, traceback, asyncio, msvcrt threading import lock dbg = false #more error printing using full_error function screen = [] #holds lines of text printed screen line = "" #async stuf loop = asyncio.get_event_loop() lock = lock() #server location target = 'localhost' port = 17532 buffer_size = 1024 server = none transports = [] #an array of async.iotransport`s #functions error handling full_error = lambda: traceback.print_exception(*sys.exc_info()) #system compatable functions if os.name[0:5]=='posix': def pause(): os.system('read -n1 -r -p "press key continue . . ." key') def cls(): os.system("clear") elif os.name[0:2]=='nt': def pause(): os.system("pause") def cls(): os.system("cls") else: def pause(): input("press enter continue . . .") def cls(): os.system("clear") def draw_screen(): cls() in screen: print(i) print("> ", line, end="\r") #draws current line of text being typed if dbg: loop.set_debug(dbg) async def write(): global line while true: temp = await loop.run_in_executor(none, msvcrt.getch) temp = temp.decode('utf-8') if temp == "\b": line = line[0:len(line)-1] elif not (temp == "\r" or temp == "\n"): line += temp else: in transports: loop.call_soon(i.write,line.encode('utf-8')) line = "" draw_screen() class server(asyncio.protocol): def connection_made(self,transport): self.transport = transport transport.write(b"connected") def data_received(self, data): global screen screen += ["%s"%data.decode('utf-8')] draw_screen() try: coro = loop.create_connection(server, host=target, port=port) task = loop.create_task(coro) server, serverp = loop.run_until_complete(task) transports += [server] user_input = loop.create_task(write()) loop.run_forever() print('end') except exception e: if dbg: full_error() else: print(type(e)) print(e) pause() sys.exit()
Comments
Post a Comment