JS Task API Examples: composing tasks
Introduction
Task Executor methods take a task function as a parameter for each of its methods. This function is asynchronous and provides access to the ExeUnit object, which is provided as one of its parameters.
A task function may be very simple, consisting of a single command, or it may consist of a set of steps that include running commands or sending data to and from providers.
Commands can be run in sequence or can be chained in batches. Depending on how you define your batch, you can obtain results of different types.
The following commands are currently available:
Command | Available in node.js | Available in web browser |
---|---|---|
run() | yes | yes |
runAndStream() | yes | yes |
uploadFile() | yes | no |
uploadJson() | yes | yes |
downloadFile() | yes | no |
uploadData() | yes | yes |
downloadData() | no | yes |
downloadJson() | no | yes |
This article focuses on the run()
, runAndStream()
commands and chaining commands using the beginBatch()
method. Examples for the uploadFile()
, uploadJSON()
, downloadFile()
commands can be found in the Transferring Data article.
We'll start with a simple example featuring a single run()
command. Then, we'll focus on organizing a more complex task that requires a series of steps:
- send a
worker.js
script to the provider (this is a simple script that prints "Good morning Golem!" in the terminal), - run the
worker.js
on a provider and save the output to a file (output.txt) and finally - download the
output.txt
file back to your computer.
Prerequisites
Yagna service is installed and running with the try_golem
app-key configured.
How to run examples
Create a project folder, initialize a Node.js project, and install libraries.
mkdir golem-example
cd golem-example
npm init
npm i @golem-sdk/task-executor
npm i @golem-sdk/pino-logger
Copy the code into the index.mjs
file in the project folder and run:
node index.mjs
Some of the examples require a simple worker.mjs
script that can be created with the following command:
echo console.log("Hello Golem World!"); > worker.mjs
Running a single command
Below is an example of a simple script that remotely executes node -v
.
import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
(async () => {
const executor = await TaskExecutor.create({
logger: pinoPrettyLogger({ level: "info" }),
api: { key: "try_golem" },
demand: {
workload: {
imageTag: "golem/node:20-alpine",
},
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
},
});
try {
const result = await executor.run(async (exe) => (await exe.run("node -v")).stdout);
console.log("Task result:", result);
} catch (err) {
console.error("An error occurred:", err);
} finally {
await executor.shutdown();
}
})();
Note that exe.run()
accepts a string as an argument. This string is a command invocation, executed exactly as one would do in the console. The command will be run in the folder defined by the WORKDIR
entry in your image definition.
Running multiple commands (prosaic way)
Your task function can consist of multiple steps, all run on the exe
exeUnit.
import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
(async () => {
const executor = await TaskExecutor.create({
logger: pinoPrettyLogger({ level: "info" }),
api: { key: "try_golem" },
demand: {
workload: {
imageTag: "golem/node:20-alpine",
},
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
},
});
try {
const result = await executor.run(async (exe) => {
await exe.uploadFile("./worker.mjs", "/golem/input/worker.mjs");
await exe.run("node /golem/input/worker.mjs > /golem/input/output.txt");
const result = await exe.run("cat /golem/input/output.txt");
await exe.downloadFile("/golem/input/output.txt", "./output.txt");
return result.stdout;
});
console.log(result);
} catch (err) {
console.error("An error occurred:", err);
} finally {
await executor.shutdown();
}
})();
To ensure the proper sequence of execution, all calls must be awaited. We only handle the result of the second run()
and ignore the others.
If you use this approach, each command is sent separately to the provider and then executed.
Organizing commands into batches
Now, let's take a look at how you can arrange multiple commands into batches. Depending on how you finalize your batch, you will obtain either:
- an array of result objects or
- Observable rxjs
Organizing commands into a batch resulting in a Promise of array of results
Use the beginBatch()
method and chain commands followed by .end()
.
import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
(async () => {
const executor = await TaskExecutor.create({
logger: pinoPrettyLogger(),
api: { key: "try_golem" },
demand: {
workload: {
imageTag: "golem/node:20-alpine",
},
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
},
});
try {
const result = await executor.run(async (exe) => {
return (
await exe
.beginBatch()
.uploadFile("./worker.mjs", "/golem/input/worker.mjs")
.run("node /golem/input/worker.mjs > /golem/input/output.txt")
.run("cat /golem/input/output.txt")
.downloadFile("/golem/input/output.txt", "./output.txt")
.end()
)[2]?.stdout;
});
console.log(result);
} catch (error) {
console.error("Computation failed:", error);
} finally {
await executor.shutdown();
}
})();
All commands after .beginBatch()
are run in a sequence. The chain is terminated with .end()
. The output is a Promise of an array of result objects. They are stored at indices according to their position in the command chain (the first command after beginBatch()
has an index of 0).
The output of the 3rd command, run('cat /golem/input/output.txt')
, is under the index of 2.
Organizing commands into a batch producing an Observable
To produce an Observable, use the beginBatch()
method and chain commands, followed by endStream()
.
import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
(async () => {
const executor = await TaskExecutor.create({
logger: pinoPrettyLogger({ level: "info" }),
api: { key: "try_golem" },
demand: {
workload: {
imageTag: "golem/node:20-alpine",
},
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
},
});
try {
const result = await executor.run(async (exe) => {
const res = await exe
.beginBatch()
.uploadFile("./worker.mjs", "/golem/input/worker.mjs")
.run("node /golem/input/worker.mjs > /golem/input/output.txt")
.run("cat /golem/input/output.txt")
.downloadFile("/golem/input/output.txt", "./output.txt")
.endStream();
return new Promise((resolve) => {
res.subscribe({
next: (result) => console.log(result),
error: (error) => console.error(error),
complete: () => resolve(),
});
});
});
} catch (error) {
console.error("Computation failed:", error);
} finally {
await executor.shutdown();
}
})();
Note that in this case, as the chain ends with .endStream()
, we can read data chunks from Observable, denoted as res
.
Once the stream is completed, we can terminate our TaskExecutor instance.
Running commands and collecting output as a stream
Here are two examples of how to run a command and collect its output as a stream.
Basic runAndStream scenario
In the first example, we run a command that produces both stdout and stderr outputs that we pass to the console. This command will terminate on its own after ten cycles.
import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
(async function main() {
const executor = await TaskExecutor.create({
logger: pinoPrettyLogger(),
api: { key: "try_golem" },
demand: {
workload: {
// What do you want to run
imageTag: "golem/alpine:latest",
},
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
},
task: {
// Control the execution of tasks
taskTimeout: 5 * 60 * 1000,
},
});
try {
let result = await executor.run(async (exe) => {
console.log("Provider deployed");
await exe.run(
`echo 'counter=0; while [ $counter -lt 10 ]; do ls ./home non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
);
await exe.run("chmod 700 ./script.sh");
let remoteProcess = await exe.runAndStream("/bin/sh ./script.sh");
remoteProcess.stderr.subscribe((data) => console.error("stderr: ", data));
await new Promise((resolve) => {
remoteProcess.stdout.subscribe({
next: (data) => console.log("stdout: ", data),
complete: () => resolve(),
});
});
});
console.log(result);
} catch (err) {
console.error("Running the task on Golem failed due to", err);
} finally {
await executor.shutdown();
}
})();
runAndStream scenario with timeout defined
In this example, we show how to use remoteProcess.waitForExit()
to terminate the process. Note that in the current implementation, the exit caused by timeout will terminate the activity on a provider, therefore the user cannot run another command on the provider. The task executor will instead run the next task on another provider.
import { TaskExecutor } from "@golem-sdk/task-executor";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
const executor = await TaskExecutor.create({
logger: pinoPrettyLogger(),
api: { key: "try_golem" },
demand: {
workload: {
imageTag: "golem/alpine:latest",
},
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
},
task: {
maxParallelTasks: 1,
},
});
// the example will run a tasks 4 times, in sequence (as maxParallelTasks is 1)
for (const i of [1, 2, 3, 4]) {
await executor
.run(async (exe) => {
// each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay
// the command generating the sequence is saved to script.sh file
await exe.run(
`echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
);
// permissions are modified to be able to run the script
await exe.run("chmod 700 ./script.sh");
// script is run and stream results, stdout and stderr are processed
let remoteProcess = await exe.runAndStream("/bin/sh ./script.sh");
remoteProcess.stdout.subscribe((data) => console.log(`iteration: ${i}:`, "stdout>", data));
remoteProcess.stderr.subscribe((data) => console.error(`iteration: ${i}:`, "stderr>", data));
// For odd tasks, we set streaming timeout to 10 secs,
// the script will end normally, for equal tasks we will exit the run method after 3 secs.
// The exit caused by timeout will terminate the activity on a provider,
// therefore the user cannot run another command on the provider.
// Task executor will run the next task on another provider.
const timeout = i % 2 === 0 ? 3_000 : 10_000;
const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => {
console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${exe.provider.name}`);
exe
.run("ls -l")
.catch((e) =>
console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e),
);
});
if (finalResult) {
// if the spawn exited without timeout, the provider is still available
console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${exe.provider.name}`);
console.log("Running command after normal runAndStream exit is possible:", (await exe.run("ls -l")).stdout);
}
})
.catch((error) => console.error("Execution of task failed due to error.", error));
}
await executor.shutdown();