Generating 100k events per second

Natarajan Santhosh
14 min readMay 14, 2023

--

Generating 100k events per second is a challenging task, and the performance largely depends on the hardware configuration of the machine running the code. However, here is an example TypeScript code that uses the setInterval method to generate events every 10 microseconds:

RUN on online IDE

RUN on online IDE


let fruits: string[] = ['apple', 'banana', 'orange', 'kiwi', 'mango'];
// console.log(randomFruit); // Outputs a random fruit from the array

let sylogs: string[] = ['May 14 00:00:01 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (homebrew.mxcl.redis[27562]): Service exited due to SIGILL | sent by exc handler[27562]',
'May 14 00:00:01 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (homebrew.mxcl.redis): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.',
'May 14 00:00:08 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (com.launch.munbyn): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.',
'May 14 00:00:11 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (homebrew.mxcl.redis[27570]): Service exited due to SIGILL | sent by exc handler[27570]',
'May 14 00:00:11 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (homebrew.mxcl.redis): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.',
'May 14 00:00:18 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (com.launch.munbyn): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.',
'May 14 00:00:21 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (homebrew.mxcl.redis[27577]): Service exited due to SIGILL | sent by exc handler[27577]',
'May 14 00:00:21 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (homebrew.mxcl.redis): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.',
'May 14 00:00:26 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (com.apple.mdworker.shared.10000000-0500-0000-0000-000000000000[27452]): Service exited due to SIGKILL | sent by mds[113]',
'May 14 00:00:26 Natarajans-MacBook-Pro-2 com.apple.xpc.launchd[1] (com.apple.mdworker.shared.08000000-0400-0000-0000-000000000000[27418]): Service exited due to SIGKILL | sent by mds[113]']

let count = 0;

setInterval(() => {
count++;
// Get a random index from 0 to length of array - 1
const randomIndex = Math.floor(Math.random() * sylogs.length);

// Get the element at the random index
const randomFruit = sylogs[randomIndex];
console.log(`Event ${count} generated: ${randomFruit}`);
}, 1000);

This code will generate events every 10 microseconds (or 100,000 events per second). You can adjust the interval time to increase or decrease the rate of event generation based on your hardware limitations.

TypeScript application in a Docker container

You can use a Dockerfile that installs Node.js and compiles the TypeScript code. Here’s an example Dockerfile:

FROM node:14-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
CMD ["npm", "start"]

Let’s go through each line of this Dockerfile:

  1. FROM node:14-alpine: This sets the base image to Node.js 14 running on Alpine Linux, a lightweight distribution.
  2. WORKDIR /app: This sets the working directory to /app.
  3. COPY package*.json ./: This copies the package.json and package-lock.json files to the working directory.
  4. RUN npm install: This installs the dependencies specified in the package.json file.
  5. COPY . .: This copies all the remaining files in the current directory to the working directory.
  6. RUN npm run build: This compiles the TypeScript code into JavaScript. The npm run build command should be defined in your package.json file.
  7. CMD ["npm", "start"]: This specifies the command to run when the container starts. In this case, it runs the start command defined in the package.json file.

To build and run the Docker image, you can use the following commands:

docker build -t myapp .
docker run myapp

Replace myapp with a name of your choice for the Docker image. This will create a new Docker image based on the Dockerfile and run it in a container. The CMD instruction will start the application when the container is launched.

To send the contents of an array to a TCP server in TypeScript

You can use the built-in net module in Node.js. Here’s an example code snippet that demonstrates how to do this:

import * as net from 'net';

const host = 'localhost';
const port = 3000;

const data = [1, 2, 3, 4, 5];

const socket = new net.Socket();

socket.connect(port, host, () => {
console.log(`Connected to ${host}:${port}`);
socket.write(JSON.stringify(data));
});

socket.on('data', (data) => {
console.log(`Received: ${data}`);
socket.destroy();
});

socket.on('close', () => {
console.log('Connection closed');
});

In this code snippet, we first import the net module. Then we define the host and port variables that specify the TCP server's address.

Next, we define an array of data that we want to send to the server. In this case, the data is a simple array of integers.

We create a new net.Socket object and call its connect method to connect to the TCP server at the specified host and port. In the callback function, we use the write method to send the contents of the array to the server after converting it to a JSON string.

We register event listeners for the data and close events on the socket. The data event is fired when the server sends a response back, and we log the received data to the console. The close event is fired when the socket is closed, and we log a message to the console to indicate that the connection was closed.

You can run this TypeScript code using a tool like ts-node or compile it to JavaScript and run it with node. Just make sure that the TCP server is running and listening for incoming connections at the specified host and port.

To send the contents of an array to a TCP server asynchronously and in a multithreaded manner in TypeScript

You can use the built-in worker_threads module in Node.js. Here's an example code snippet that demonstrates how to do this:

import * as net from 'net';
import { Worker, isMainThread } from 'worker_threads';

const host = 'localhost';
const port = 3000;

const data = [1, 2, 3, 4, 5];

if (isMainThread) {
const worker = new Worker(__filename);
worker.on('message', (message) => {
console.log(message);
});
worker.postMessage({ host, port, data });
} else {
const socket = new net.Socket();
const { host, port, data } = workerData;

socket.connect(port, host, () => {
console.log(`Connected to ${host}:${port}`);
// also can send text data
// const message = 'Hello, server!';
// socket.write(message);

// if you need to send text data that contains non-ASCII characters,
// you should ensure that the text is properly encoded before sending
// it. Node.js supports various text encodings, such as UTF-8,
// which can be specified when creating a Buffer object or when
// calling the toString() method on a Buffer object.
socket.write(JSON.stringify(data));
});

socket.on('data', (data) => {
console.log(`Received: ${data}`);
socket.destroy();
});

socket.on('close', () => {
console.log('Connection closed');
parentPort.postMessage('Done');
});
}

In this code snippet, we first import the net module and the Worker and isMainThread classes from the worker_threads module.

We define the host and port variables that specify the TCP server's address, as well as the data array that we want to send to the server.

Next, we check if we are in the main thread by calling the isMainThread function. If we are, we create a new worker and post a message to it containing the host, port, and data variables. We register an event listener for the message event on the worker to receive messages from it.

If we are in a worker thread, we use the workerData object to retrieve the host, port, and data variables that were passed in the message. We create a new net.Socket object and call its connect method to connect to the TCP server at the specified host and port. In the callback function, we use the write method to send the contents of the array to the server after converting it to a JSON string.

We register event listeners for the data and close events on the socket. The data event is fired when the server sends a response back, and we log the received data to the console. The close event is fired when the socket is closed, and we log a message to the console to indicate that the connection was closed. We use the parentPort object to post a message to the parent thread when the connection is closed.

To run this TypeScript code, you can use a tool like ts-node or compile it to JavaScript and run it with node. The main thread creates a new worker and sends it a message containing the host, port, and data variables. The worker thread connects to the server and sends the data to it asynchronously. Multiple worker threads can be created in parallel to send data to the server in a multithreaded manner.

To throttle socket.write() calls in TypeScript

You can use a combination of the setTimeout() function and a queue to limit the rate at which data is sent to the socket.

Here’s an example implementation:

import * as net from 'net';

const host = 'localhost';
const port = 3000;

const socket = new net.Socket();
const dataQueue: string[] = [];
let isWriting = false;

socket.connect(port, host, () => {
console.log(`Connected to ${host}:${port}`);
});

function writeData() {
if (!dataQueue.length || isWriting) {
return;
}

const data = dataQueue.shift();
isWriting = true;

socket.write(data, () => {
console.log(`Sent data: ${data}`);
isWriting = false;
setTimeout(writeData, 1000); // Wait 1 second before sending more data
});
}

function sendData(data: string) {
dataQueue.push(data);
writeData();
}

// Example usage
sendData('Hello, server!');
sendData('How are you?');
sendData('Goodbye!');

In this code snippet, we create a new net.Socket object and connect it to a TCP server at the specified host and port.

We define a dataQueue array to store the data that we want to send to the server and a boolean isWriting flag to indicate whether we are currently writing data to the socket.

We define a writeData() function that checks if there is data in the queue and whether we are currently writing data to the socket. If we are not currently writing data and there is data in the queue, we shift the first element from the queue and set the isWriting flag to true. We then call socket.write() with the data and a callback function that sets the isWriting flag to false and schedules the writeData() function to be called again after a 1-second delay using setTimeout().

We define a sendData() function that adds the specified data to the end of the dataQueue array and calls writeData() to attempt to send the data immediately, if possible.

In the example usage at the bottom of the code snippet, we call sendData() multiple times to add data to the queue and send it to the server.

With this implementation, the writeData() function will attempt to send data from the queue at most once per second, allowing you to control the rate at which data is sent to the server. If you need a more fine-grained control over the rate of data transmission, you can adjust the delay passed to setTimeout()

To throttle socket.write() calls with an array of data and handle multiple requests asynchronously in TypeScript

You can use a combination of a queue, promises, and a loop to limit the rate at which data is sent to the socket.

Here’s an example implementation:

import * as net from 'net';

const host = 'localhost';
const port = 3000;

const socket = new net.Socket();
const dataQueue: string[] = [];
let isWriting = false;

function sendData(data: string[]) {
return new Promise<void>((resolve, reject) => {
dataQueue.push(...data);
resolve();
});
}

function writeData() {
if (!dataQueue.length || isWriting) {
return;
}

const data = dataQueue.shift();
isWriting = true;

socket.write(data, () => {
console.log(`Sent data: ${data}`);
isWriting = false;
setTimeout(writeData, 1000); // Wait 1 second before sending more data
});
}

async function processQueue() {
while (true) {
if (dataQueue.length > 0) {
await sendData([dataQueue.shift()]);
} else {
await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait 1 second before checking the queue again
}
}
}

socket.connect(port, host, () => {
console.log(`Connected to ${host}:${port}`);
processQueue();
});

// Example usage
sendData(['Hello, server!', 'How are you?', 'Goodbye!']);

In this code snippet, we create a new net.Socket object and connect it to a TCP server at the specified host and port.

We define a dataQueue array to store the data that we want to send to the server and a boolean isWriting flag to indicate whether we are currently writing data to the socket.

We define a sendData() function that adds the specified data to the end of the dataQueue array using the spread operator (...) and returns a Promise that resolves immediately. This allows us to use async/await syntax to wait for the promise to resolve before sending the next piece of data.

We define a writeData() function that is similar to the previous example implementation. It checks if there is data in the queue and whether we are currently writing data to the socket. If we are not currently writing data and there is data in the queue, we shift the first element from the queue and set the isWriting flag to true. We then call socket.write() with the data and a callback function that sets the isWriting flag to false and schedules the writeData() function to be called again after a 1-second delay using setTimeout().

We define a processQueue() function that uses a while loop to continuously check the queue and send data to the server as soon as it becomes available. The loop waits for a second if the queue is empty before checking again.

In the example usage at the bottom of the code snippet, we call sendData() with an array of data to add to the queue and send it to the server, waiting for the Promise to resolve before sending the next piece of data.

With this implementation, the processQueue() function continuously checks the queue and sends data to the server as soon as it becomes available, allowing you to handle multiple requests asynchronously while limiting the rate at which data is sent to the server. If you need a more fine-grained control over the rate of data transmission, you can adjust the delay passed to setTimeout().

To read a text file as an array of strings in TypeScript

You can use the fs module that is built into Node.js. Here's an example implementation:

import * as fs from 'fs';

function readFileToArray(filename: string): Promise<string[]> {
return new Promise<string[]>((resolve, reject) => {
fs.readFile(filename, 'utf8', (err, data) => {
if (err) {
reject(err);
} else {
const lines = data.split('\n').map((line) => line.trim());
resolve(lines);
}
});
});
}

// Example usage
readFileToArray('file.txt')
.then((lines) => {
console.log(lines);
})
.catch((err) => {
console.error(err);
});

In this example, we define a readFileToArray() function that takes a filename parameter and returns a Promise that resolves to an array of strings. The function uses the fs.readFile() method to read the contents of the file with the specified filename. We pass 'utf8' as the second parameter to read the file as a UTF-8 encoded string.

The fs.readFile() method takes a callback function with two parameters: an err parameter that contains any error that occurred during the file read, and a data parameter that contains the contents of the file as a string.

If an error occurred during the file read, we reject the Promise with the error object. Otherwise, we split the data string into an array of lines using the split() method with '\n' as the separator. We then use the map() method to trim whitespace from each line, and resolve the Promise with the resulting array of lines.

In the example usage, we call readFileToArray() with the filename 'file.txt'. The Promise returned by the function is then fulfilled with an array of strings representing the lines in the file. We log the resulting array to the console using console.log(). If an error occurred during the file read, we log the error to the console using console.error().

To throttle the sending of a JSON array over a socket using TypeScript and Node.js with async multithreading

You can use the async and await keywords to wait for each chunk of data to be sent before sending the next chunk. Here's an example implementation:

import * as net from 'net';

async function throttleSocketWrite(socket: net.Socket, data: any[], chunkSize: number, delay: number) {
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = JSON.stringify(data.slice(i, i + chunkSize));
await new Promise((resolve) => {
setTimeout(() => {
socket.write(chunk);
resolve();
}, delay);
});
}
}

// Example usage
const socket = new net.Socket();
socket.connect(8080, 'localhost', () => {
const data = [{ name: 'Alice', age: 25 }, { name: 'Bob', age: 30 }, { name: 'Charlie', age: 35 }];
throttleSocketWrite(socket, data, 1, 1000)
.then(() => {
console.log('Data sent successfully');
socket.end();
})
.catch((err) => {
console.error('Error sending data:', err);
socket.end();
});
});

In this example, we define an async function called throttleSocketWrite() that takes a socket parameter, a data parameter that represents an array of JSON objects, a chunkSize parameter that specifies the number of objects to send in each chunk, and a delay parameter that specifies the delay in milliseconds between each chunk of data. The function sends the data over the socket in chunks, waiting for each chunk to be sent before sending the next chunk.

The function uses a for loop to iterate over the data array, sending each chunk of data using the socket.write() method. Before sending each chunk, the function waits for the specified delay using the setTimeout() method wrapped in a Promise. Once the delay has elapsed, the function sends the chunk of data over the socket and resolves the Promise.

In the example usage, we create a new socket and connect to a server running on localhost on port 8080. We define an array of JSON objects called data and call throttleSocketWrite() with the socket, the data array, a chunkSize of 1, and a delay of 1000 milliseconds. We use the then() method to log a success message and close the socket when all data has been sent, or the catch() method to log an error message and close the socket if an error occurred during the data transfer.

To throttle the sending of a JSON array over a socket using TypeScript and Node.js with async multithreading and retry on error sending data

You can modify the previous example to catch errors thrown by the socket.write() method and retry sending the chunk of data after a certain delay. Here's an example implementation:

import * as net from 'net';

async function throttleSocketWrite(socket: net.Socket, data: any[], chunkSize: number, delay: number, retries: number) {
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = JSON.stringify(data.slice(i, i + chunkSize));
let attempts = 0;
let success = false;
while (!success && attempts <= retries) {
try {
await new Promise((resolve) => {
setTimeout(() => {
socket.write(chunk, () => {
success = true;
resolve();
});
}, delay);
});
} catch (err) {
attempts++;
console.error(`Error sending data (attempt ${attempts}):`, err);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
if (!success) {
throw new Error(`Failed to send data after ${attempts} attempts`);
}
}
}

// Example usage
const socket = new net.Socket();
socket.connect(8080, 'localhost', () => {
const data = [{ name: 'Alice', age: 25 }, { name: 'Bob', age: 30 }, { name: 'Charlie', age: 35 }];
throttleSocketWrite(socket, data, 1, 1000, 3)
.then(() => {
console.log('Data sent successfully');
socket.end();
})
.catch((err) => {
console.error('Error sending data:', err);
socket.end();
});
});

In this example, we’ve added two additional parameters to the throttleSocketWrite() function: retries, which specifies the maximum number of times to retry sending a chunk of data before giving up, and attempts, which keeps track of the number of attempts made to send a chunk of data.

Within the for loop that iterates over the data array, we've added a while loop that keeps retrying sending the current chunk until either it is sent successfully or the maximum number of retries has been reached. Inside the while loop, we catch any errors thrown by the socket.write() method and log an error message. We then wait for the specified delay before retrying sending the chunk. If the chunk is sent successfully, we set the success flag to true and resolve the promise to continue with the next chunk.

At the end of the for loop, we check if the success flag is false. If so, we throw an error indicating that the data failed to send after the maximum number of retries.

In the example usage, we call throttleSocketWrite() with a retries value of 3. This means that the function will attempt to send each chunk up to 3 times before giving up and throwing an error. If all chunks are sent successfully, the then() method will log a success message and close the socket. If an error occurs during the data transfer, the catch() method will log an error message and close the socket.

--

--

No responses yet