Tests: Implement export() for task access
This commit is contained in:
@@ -5,6 +5,7 @@ import tempfile
|
|||||||
import shutil
|
import shutil
|
||||||
import atexit
|
import atexit
|
||||||
import unittest
|
import unittest
|
||||||
|
import json
|
||||||
from .utils import (run_cmd_wait, run_cmd_wait_nofail, which,
|
from .utils import (run_cmd_wait, run_cmd_wait_nofail, which,
|
||||||
task_binary_location)
|
task_binary_location)
|
||||||
from .exceptions import CommandError
|
from .exceptions import CommandError
|
||||||
@@ -146,6 +147,16 @@ class Task(object):
|
|||||||
with open(self.taskrc, "r") as f:
|
with open(self.taskrc, "r") as f:
|
||||||
return f.readlines()
|
return f.readlines()
|
||||||
|
|
||||||
|
def export(self, export_filter=None):
|
||||||
|
"""Run "task export", return JSON array of exported tasks."""
|
||||||
|
if export_filter is None:
|
||||||
|
export_filter = ""
|
||||||
|
|
||||||
|
code, out, err = self.runSuccess("rc.json.array=1 {0} export"
|
||||||
|
"".format(export_filter))
|
||||||
|
|
||||||
|
return json.loads(out)
|
||||||
|
|
||||||
def runSuccess(self, args=(), input=None, merge_streams=False,
|
def runSuccess(self, args=(), input=None, merge_streams=False,
|
||||||
timeout=5):
|
timeout=5):
|
||||||
"""Invoke task with given arguments and fail if exit code != 0
|
"""Invoke task with given arguments and fail if exit code != 0
|
||||||
|
|||||||
Reference in New Issue
Block a user