This is a read-only snapshot of the ComputerCraft forums, taken in April 2020.
AnotherName's profile picture

events lost in my coroutine manager

Started by AnotherName, 02 July 2016 - 05:12 PM
AnotherName #1
Posted 02 July 2016 - 07:12 PM
I was trying to make my own coroutine manager to have logic multitasking. But when I use os.pullEvent() on my tasks, it doesn't work well. Seems that it lost many events. I don't know what is wrong, or how to solve.

As example in my test when i try to write, it only print some chars of that I typed

There is my code (split in three files):

pseudOS_Concurrent_Planifier
Spoiler

local oTask_Running=nil
local tPrepared={{},{},{},{},{}}				   --VH,H,M,L,VL FiFO Queues, element{task,event}
local tEventWaiting={}		--FIFO Queue, element {task, eventFilter}
local tLastQueueCalled={0,0,0,0,0}		--VH,H,M,L,VL counts the number of turns without execute a task of a queue.
local tLastQueueCalledConst={0,5,7,9,11}   --indicates the number of turns wich have to pass, to force the execution of next task of the queue [it ensures progression], the values must be > #tLastQueueCalledConst and two more than previous at least
---------------------------------------------------------
---------------------------------------------------------
local function runNextTask()
local bOk=nil
local sCtrlMsg=nil
local tQueue=nil
local tTask=nil
local sParam=nil
--select the next task queue
if tLastQueueCalled[5]>=tLastQueueCalledConst[5] and #tPrepared[5]>0 then tQueue=tPrepared[5] tLastQueueCalled[5]=-1
elseif tLastQueueCalled[4]>=tLastQueueCalledConst[4] and  #tPrepared[4]>0 then tQueue=tPrepared[4] tLastQueueCalled[4]=-1
elseif tLastQueueCalled[3]>=tLastQueueCalledConst[3] and #tPrepared[3]>0 then tQueue=tPrepared[3] tLastQueueCalled[3]=-1
elseif tLastQueueCalled[2]>=tLastQueueCalledConst[2] and #tPrepared[2]>0 then tQueue=tPrepared[2] tLastQueueCalled[2]=-1
else
  if  #tPrepared[1]>0 then tQueue=tPrepared[1] tLastQueueCalled[1]=-1
  elseif  #tPrepared[2]>0 then tQueue=tPrepared[2] tLastQueueCalled[2]=-1
  elseif  #tPrepared[3]>0 then tQueue=tPrepared[3] tLastQueueCalled[3]=-1
  elseif  #tPrepared[4]>0 then tQueue=tPrepared[4] tLastQueueCalled[4]=-1
  elseif  #tPrepared[5]>0 then tQueue=tPrepared[5] tLastQueueCalled[5]=-1
  else return #tEventWaiting>0 end
end
--increase the number of all turns
for i=1, #tLastQueueCalled do
  tLastQueueCalled[i]=tLastQueueCalled[i]+1
end
--pull task form queue, run and queue again if is necesary
tTask=table.remove(tQueue,1)
oTask_Running=tTask[1]
tTask[1].setState(pseudOS.Concurrent.Task.STATE_RUNNING)
bOK,sCtrlMsg, sParam=coroutine.resume(tTask[1].getCoroutine(), unpack(tTask[2]))
oTask_Running=nil
if not bOK then
  print("error with task "..tTask[1].getID()..", task killed and terminated. Internal error: "..sCtrlMsg)
  tTask[1].setState(pseudOS.Concurrent.Task.STATE_ERRORED)
  --[TODO]check sons, and if are deamon interrupt them
end
if coroutine.status(tTask[1].getCoroutine())~="dead" then
  if sCtrlMsg=="ctrl_msg-Prepare" then
   if (tTask[1].getPriority()==pseudOS.Concurrent.Task.PRIORITY_HIGHEST) then tPrepared[1][#tPrepared[1]+1]={tTask[1], {nil}}
   elseif  (tTask[1].getPriority()==pseudOS.Concurrent.Task.PRIORITY_HIGH) then tPrepared[2][#tPrepared[2]+1]={tTask[1], {nil}}
   elseif  (tTask[1].getPriority()==pseudOS.Concurrent.Task.PRIORITY_MEDIUM) then tPrepared[3][#tPrepared[3]+1]={tTask[1], {nil}}
   elseif  (tTask[1].getPriority()==pseudOS.Concurrent.Task.PRIORITY_LOW) then tPrepared[4][#tPrepared[4]+1]={tTask[1], {nil}}
   elseif  (tTask[1].getPriority()==pseudOS.Concurrent.Task.PRIORITY_LOWEST) then tPrepared[5][#tPrepared[5]+1]={tTask[1], {nil}}
   end
   tTask[1].setState(pseudOS.Concurrent.Task.STATE_PREPARED)
   os.queueEvent("ctrl_msg-TopWaiting")
  else
   tTask[1].setState(pseudOS.Concurrent.Task.STATE_SUSPENDED)
   tEventWaiting[#tEventWaiting+1]= {tTask[1],sCtrlMsg}
  end
else
   tTask[1].setState(pseudOS.Concurrent.Task.STATE_DEAD)
  os.queueEvent("task_terminated",tTask[1].getID())
  --[TODO]check sons, and if are deamon interrupt them
end
return true
end
local function setPrepared(_task, _event)
local task=_task or nil
local event=_event or nil
if event==nil then event={nil} end
if (_task.getPriority()==pseudOS.Concurrent.Task.PRIORITY_HIGHEST) then tPrepared[1][#tPrepared[1]+1]={task, event}
elseif  (_task.getPriority()==pseudOS.Concurrent.Task.PRIORITY_HIGH) then tPrepared[2][#tPrepared[2]+1]={task, event}
elseif  (_task.getPriority()==pseudOS.Concurrent.Task.PRIORITY_MEDIUM) then tPrepared[3][#tPrepared[3]+1]={task, event}
elseif  (_task.getPriority()==pseudOS.Concurrent.Task.PRIORITY_LOW) then tPrepared[4][#tPrepared[4]+1]={task, event}
elseif  (_task.getPriority()==pseudOS.Concurrent.Task.PRIORITY_LOWEST) then tPrepared[5][#tPrepared[5]+1]={task, event}
end
_task.setState(pseudOS.Concurrent.Task.STATE_PREPARED)
end
local function getEvent()
local i=1
local event={os.pullEventRaw()}
if event[1]=="ctrl_msg-TopWaiting" then return end
while i<=#tEventWaiting do
  if tEventWaiting[i][2]==event[1] or tEventWaiting[i][2]==nil then
   setPrepared(tEventWaiting[i][1], event)
   table.remove(tEventWaiting,i)
  else
   i=i+1
  end
  os.queueEvent("ctrl_msg-TopWaiting")
  os.pullEventRaw("ctrl_msg-TopWaiting")
end
if event[1]=="task_terminated" then

end
end
local function executePlanifier()
local res1=true
while res1 do
  res1 = runNextTask()
  getEvent()
end
end

function pseudOS.Concurrent.Planifier.initMainTask(foo_)
task=pseudOS.Concurrent.Task.new(foo_)
task.setPriority(pseudOS.Concurrent.Task.PRIORITY_HIGH)
task.start()
executePlanifier()
end

---------------------------------------------------------
function pseudOS.Concurrent.Planifier.setPrepared(task_)
setPrepared(task_,nil)
end

function pseudOS.Concurrent.Planifier.getRunning()
return oTask_Running
end

pseudOS_Concurrent_Task
Spoiler

pseudOS.Concurrent.Task.STATE_CREATED=0
pseudOS.Concurrent.Task.STATE_RUNNING=1
pseudOS.Concurrent.Task.STATE_PREPARED=2
pseudOS.Concurrent.Task.STATE_SUSPENDED=3
pseudOS.Concurrent.Task.STATE_DEAD=4
pseudOS.Concurrent.Task.STATE_ERRORED=5

pseudOS.Concurrent.Task.PRIORITY_HIGHEST=1
pseudOS.Concurrent.Task.PRIORITY_HIGH=2
pseudOS.Concurrent.Task.PRIORITY_MEDIUM=3
pseudOS.Concurrent.Task.PRIORITY_LOW=4
pseudOS.Concurrent.Task.PRIORITY_LOWEST=5

local lastID=0
function pseudOS.Concurrent.Task.new( func )
local self={}

local id=lastID+1
lastID=lastID+1
local coroutin=coroutine.create(function()
			  func()
              --pseudOS.Concurrent.Planifier.setTermined(self)
			 end)
local state=pseudOS.Concurrent.Task.STATE_CREATED
local costate=nil
local childrens={}
--local parent=pseudOS.Concurrent.Planifier.getRunning()
--if(parent~=nil) then parent.addChildren(self,"continue") end
local daemon=false
local priority=pseudOS.Concurrent.Task.PRIORITY_MEDIUM


function self.getID()
  return id
end

function self.getCoroutine()
  return coroutin
end

function self.getState()
  return state, costate
end

function self.setState(state_)
  state=state_
end

function self.getChildrens()
  local aux={}
  for i=1,#childrens do
   aux[i]=childrens[i]
   coroutine.yield()
  end
  return aux
end

function self.addChildren(task_,sys_pass)
  if type(sys_pass)~="string" or sys_pass~="continue" then return false end
  childrens[#childrens+1]=task_
  return true
end

function self.getPriority()
  return priority
end

function self.setPriority( priority_ )
  assert(type(priority_)=="number", "expected a number, got "..type(priority_))
  assert(priority_==1 or priority_==2 or priority_==3 or priority_==4 or priority_==5 , "expected a valid priority (number), got"..priority_)
  priority=priority_
end

function self.start()
  pseudOS.Concurrent.Planifier.setPrepared(self)
end

return self
end

concurrentTest
Spoiler

_G.pseudOS={
Concurrent={
  Task={},
  Planifier={}
}
}
os.run(_G,"disk/pseudOS/APIS/Concurrent/pseudOS_Concurrent_Task")
os.run(_G,"disk/pseudOS/APIS/Concurrent/pseudOS_Concurrent_Planifier")
foo1=function()
  print("[TaskMain1]: create son 1 (time "..os.clock()..")")
  fooSon1=function()
  
   local str=""
   print("[son1] started")
   term.setCursorBlink(true)
   repeat
   event={os.pullEvent("key")}
   if #keys.getName(event[2])==1 then term.write(keys.getName(event[2])) str=str..keys.getName(event[2]) end
   until event[2]==28
   print()
   print(str)
  end
  taskSon1=pseudOS.Concurrent.Task.new(fooSon1)
  taskSon1.setPriority(pseudOS.Concurrent.Task.PRIORITY_LOW)
  taskSon1.start()
  coroutine.yield("ctrl_msg-Prepare")

   print("[TaskMain1]: create son 2 (time "..os.clock()..")")
  fooSon2=function()
   for i=1, 1500 do
	print("[TaskMain1_Son2]: loop "..i.." (time "..os.clock()..")")
	coroutine.yield("ctrl_msg-Prepare")
   end
  end
  taskSon2=pseudOS.Concurrent.Task.new(fooSon2)
  taskSon2.start()
  coroutine.yield("ctrl_msg-Prepare")

   print("[TaskMain1]: create son 3 (time "..os.clock()..")")
  fooSon3=function()
   for i=1, 1500 do
	print("[TaskMain1_Son3]: loop "..i.." (time "..os.clock()..")")
	coroutine.yield("ctrl_msg-Prepare")
   end
  end
  taskSon3=pseudOS.Concurrent.Task.new(fooSon3)
  taskSon3.setPriority(pseudOS.Concurrent.Task.PRIORITY_LOW)
  taskSon3.start()
  coroutine.yield("ctrl_msg-Prepare")

  print("main finish")
end
pseudOS.Concurrent.Planifier.initMainTask(foo1)

Thanks in advance.
Edited on 02 July 2016 - 05:19 PM
Bomb Bloke #2
Posted 03 July 2016 - 03:19 AM
local function executePlanifier()
	local res1=true
	while res1 do
		res1 = runNextTask()
		getEvent()
	end
end

runNextTask() only resumes a single coroutine per call. Consider what happens if you've got ten coroutines calling read(); you type a letter, each coroutine gets prepared to resume with that letter, but only one actually resumes and requests a further letter. You type a second letter, that goes to the coroutine that was allowed to resume before, but none of the others because they haven't had a chance to request another event yet…

The more coroutines you add in, the worse it gets. Your priority system compounds the issue further, I'm afraid.

You need to ensure that all coroutines that should be resumed with a given event have been resumed with that event before you consider which coroutines should be resumed with the next event. No exceptions!

Also remember that all events pulled come from the front of the queue, and all events queued go on the end. When you use a filter when pulling an event, events get pulled from the front and discarded until one of the correct type is found. Hence if you do stuff like this:

		os.queueEvent("ctrl_msg-TopWaiting")
		os.pullEventRaw("ctrl_msg-TopWaiting")

… then any other events that exist in the queue are gonna be lost.
AnotherName #3
Posted 03 July 2016 - 11:41 AM
Thanks Bomb Bloke.

[…]
The more coroutines you add in, the worse it gets. Your priority system compounds the issue further, I'm afraid.

You need to ensure that all coroutines that should be resumed with a given event have been resumed with that event before you consider which coroutines should be resumed with the next event. No exceptions!
[…]

But it means that I must execute all task that received a event immediately?
If so, it means that isn't possible to implement a planifier with priorities?
Edited on 03 July 2016 - 09:41 AM
LBPHacker #4
Posted 03 July 2016 - 12:29 PM
I'm just going to butt into this, sorry BB.

Schedulers (or planifiers, whatever) IRL are used to balance resources, such as keeping a process that requested, for example, a file from the disk frozen until said file can be read from memory, while giving CPU time to other processes that need no files or have already loaded whatever files they're operating with and are actually doing useful work. They might also decide which processes are to be given more CPU time than others, depending on their priority.

In CC there are not too many resources you can balance, and the event model and the environment make it even more difficult to find these things. You can't really balance CPU load, since Lua multitasking is cooperative, not preemptive, so coroutines themselves decide when to yield, not the scheduler. The only other thing that can get coroutines to yield in CC is a piece of code in Java throwing "too long without yielding" errors, but that's pretty much deus ex machina. You can't balance I/O load either since the filesystem functions are blocking as it is. You can't balance events because it makes no sense to do so. What else is there then?

The only use I can see for prioritized scheduling in CC is to ensure that some low level processes (such as a GUI framework driver) get the events before userspace applications do, thus eliminating any chance if the framework falling behind and returning old data when queried by the userspace applications. This could be implemented using a simple well-organized array, which you probably already have anyway in a coroutine manager. Or you could just have multiple arrays, grouping coroutines by priority and resuming the coroutines in the highest priority array first, etc. But this would still just pass an event to every coroutine as soon as possible.


TL;DR: It is possible, but probably not the way you imagined.
Edited on 03 July 2016 - 10:37 AM