calInThread vs callFromThread

twistedmatrix에서 callInThread()와 callFromThread()는 어떻게 다른 것일까?
무심코, 코드에서 callInThread()를 썼는데, 그 차이를 궁금해 하다가 문서 및 소스를 찾아보게
되었다.

1. The Twisted Documentation 보기
우선 살펴본, The Twisted Documentation에서는 callFromThread()는 thread-safe하지 않은 메소드를 thread-safe하게 실행시킬 때 사용한다고 한다. 다른 설명으로는, thread-safe하지 않은 메소드를 메인 이벤트 루프에서 실행되도록 스케쥴한다고 한다.

from twisted.internet import reactor

def notThreadSafe(x):
   """ do something that isn't thread-safe """
   # ...

def threadSafeScheduler():
   """ Run in thread-safe manner."""
   reactor.callFromThread(noThreadSafe, 3)

반면, callInThread()는 thread상에서 메소드들을 호출하고 싶을 때 사용하며, 이 메소드들이 thread pool에 의해 queue처리 되면서 실행되도록 한다고 한다.

from twisted.internet import reactor

def aSillyBlockingMethod(x):
   import time
   time.sleep(2)
   print x

# run method in thread
reactor.callInThread(aSillyBlockiingMethod, "2 seconds have passed")


이것을 본 소감은..

1. "그게 뭔데~ 어쩌라고~"
2. no thread-safe 함수는 callFromThread()를 쓰고, safe 함수는 callInThread()를 쓰는 것인가?
3. blocking이 내부적으로 발생할 수 있는 함수는 callInThread()를 쓰는 것인가?

등 용도가 명확하지 않았다.



2. API 찾아보기
그래서, twisted API로부터, callInThread()와 callFromThread()에 대한 설명을 참고하게 되었다.
아래는 각각 callFromThread()와 callInThread()에 대한 API 설명이다.

[callFromThread()]

Run the callable object in a separate thread.


[callInThread()]

Cause a function to be executed by the reactor thread.

Use this method when you want to run a function in the reactor's thread from another thread. Calling callFromThread should wake up the main thread (where reactor.run() is executing) and run the given callable in that thread.

Obviously, the callable must be thread safe. (If you want to call a function in the next mainloop iteration, but you're in the same thread, use callLater with a delay of 0.)

위의 설명을 따른다면, callFromThread()는 별도의 thread를 띄워서 실행시켜 준다는 것이나,
callInThread()는 reactor의 내부 thread에 위탁한다는 것으로 보인다.




3. 다른 참고 문서에서는..
한편, 다른 문서에서는 각 메소드의 사용 용도에 차이를 다음과 같이 두었다.

def threadDoSomething(a, b):
   d = defer.Deferred()
   def func():
      result = doSomething(a, b)
      reactor.callFromThread(d.callback, result)
   t = threading.Thread(target = func)
   t.start()
   return d

def threadDoSomethingNew(a, b):
   d = defer.Deferred()
   def func():
      result = doSomething(a, b)
      reactor.callFromThread(d.callback, result)
   reactor.callInThread(func)
   return d

위의 사용예와 해당 문서의 설명에 따르면, reactor.callFromThread()는 twisted 중, 유일하게 thread-safe 메소드이며, reactor thread의 내부에서 주어진 함수를 실행시킨다. 첫번째 함수는 정상동작하나, 반복적으로 호출하면 새로운 thread를 계속 생성하므로 시스템에 부하를 주므로, 두번째 함수와 같이 사용하는 것이 효과적이다.

결과적으로, callInThread()는  twisted를 사용하지 않는 방식에서 threading.Thread().start() 대신에 사용하면 된다는 것이다.

from twisted.internet import reactor

c = 1
# this triial example function is
# not thread-safe
def increment(x):
   global c
   c = c + x

def threadTask():
   # run non-threadsafe function
   # in thread-safe way -- increment(7)
   # will be called in event loop thread
   reactor.callFromThread(increment, 7)

# add a number of tasks to thread pool
for i in range(10):
   reactor.callInThread(threadTask)


출처 : Network Programming for the Rest of Us, Glyph Lefkowitz, http://www.twistedmatrix.com/users/glyph

위의 예 대로라면, thread를 launch 시킬 때는 callInThread()를 쓰고, callFromThread()를 사용하여 thread-safe하게 동작하도록 해준다는 것이다.

그래도, 미심쩍은 면은 있다!!

4. 소스에서는?
한편, 다른 문서에서는 각 메소드의 사용 용도에 차이를 다음과 같이 두었다.

[callFromThread]

추적해보니, base.py에 정의되어 있음.

def callFromThread(self, f, *args, **kw):
    """See twisted.internet.interfaces.IReactorThreads.callFromThread.
    """
    assert callable(f), "%s is not callable" % (f,)
    # lists are thread-safe in CPython, but not in Jython
    # this is probably a bug in Jython, but until fixed this code
    # won't work in Jython.
    self.threadCallQueue.append((f, args, kw))
    self.wakeUp()

threadCallQueue는 ReactorBse에 선언된 변수로서, list이다. callFromThread()가 호출될 때마다
내부의 threadCallQueue 리스트에 추가되고, runUntilCurrent()라는 내부 메소드가 실행 되면 wakeup 해서, queue 내부의 함수를 순차적으로 시행하고, dequeue시킨다.

[callInThread]
추적해보니, base.py 및 threadpool.py에 정의되어 있음.

def callInThread(self, _callable, *args, **kwargs):
    """See twisted.internet.interfaces.IReactorThreads.callInThread.
    """
    if self.threadpool is None:
    self._initThreadPool()
    self.threadpool.callInThread(_callable, *args, **kwargs)

def callInThread(self, func, *args, **kw):
    if self.joined:
    return
    ctx = context.theContextTracker.currentContext().contexts[-1]
    o = (ctx, func, args, kw)
    self.q.put(o)
    if self.started:
        self._startSomeWorkers()

위의 코드에서 보는 바와 같이, callInThread를 호출하면, 내부에 정의한 Queue.Queue 객체에
함수를 넣고, thread pool에 정의된 수에 따라 동작시킨다. 내부적으로 threading.Thread()를
미리 실행시켜서 queue로 보관하고 있다.

결과적으로, callFromThread를 사용하면, 내부의 이벤트 루프에 넣어서 실행되도록 하고, callInThread를 사용하면, 내부에 생성해 놓은 thread pool로부터 실행하는 것이다라는 설명이 맞다는 뜻 되겠다~

따라서, 새로 시작하는 thread를 callInThread에 넣어서 시작시키고, 내부적으로 callFromThread를 타도록 시키는 게 맞겠다. 문서에 나와 있는 바와 같이, twisted API는 callFromThread를 통해서 호출하라고 한 것도 같은 맥락으로 보인다.

댓글

Designed by JB FACTORY