Week 3 | Lesson 10

Memory & Threads

JVM, JRE, JDK, compiler
Memory management
Multithreading
Coroutines ♡

Java Virtual Machine

JVM

Java Virtual Machine

"Write Once, Run Anywhere"
  • The main purpose of JVM is to provide a runtime environment for Java applications, that is independent of the underlying hardware and operating system.

  • Other programming languages that can run on JVM include Kotlin, Scala, Groovy and Clojure.

  • JVM also provides tools for runtime performance optimization, memory management (garbage collection), monitoring, and other.

  • Java Virtual Machine is a part of the Java Runtime Environment (JRE).

Java Virtual Machine

There are three ways to look at JVM implementations.
  1. Specification
    Defines how the JVM should be implemented.
  2. Implementation
    The actual JVM implementation.
  3. Instance
    A running JVM process (create every time you start a Java program).

Since JVM is open source, it exists in more than one implementation. They all follow the specification, but may differ in performance, memory management, and other aspects.

Some of the most popular JVM implementations are:

  • Oracle HotSpot JVM
  • Eclipse OpenJ9
  • GraalVM

JRE, JDK, compiler

JRE, JDK, compiler

JRE - Java Runtime Environment
JRE is the part of Java required to run Java applications. It includes JVM, core libraries, and other components. If you only want to run Java applications, you only need JRE.

JDK - Java Development Kit
You need JDK if you want to develop Java applications. It includes JRE, compiler, and other development tools.

Java Compiler
Java source code is compiled into bytecode, which is then executed by JVM. To do so, you need a Java compiler.

Java Bytecode
Java bytecode is the instruction set for the Java Virtual Machine.

Java Bytecode

Write Once, Run Anywhere

Java bytecode is the intermediate representation of Java code which is output by the Java compiler (javac). It is not the machine code for any particular computer - it is not executed by the CPU of any computer.

Instead, the Java bytecode is executed by the Java Virtual Machine (JVM). You can say it is an instruction set for the JVM.

Java Bytecode

Write Once, Run Anywhere

When a Java program is compiled, each individual class file is compiled into a separate bytecode file (with a .class extension). This bytecode is platform independent, which means the same bytecode can run on any device that has a JVM.

						
							fun main(args: Array<String>) {
								println("Hello, World!")
							}
						
					

Compiles to following bytecode:

						
							Compiled from "HelloWorld.java"
							public class lesson08.HelloWorld {
							  public lesson08.HelloWorld();
								Code:
								   0: aload_0
								   1: invokespecial #1                  // Method java/lang/Object."<init>":()V
								   4: return

							  public static void main(java.lang.String[]);
								Code:
								   0: getstatic     #7                  // Field java/lang/System.out:Ljava/io/PrintStream;
								   3: ldc           #13                 // String Hello, World!
								   5: invokevirtual #15                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
								   8: return
							}
						
					

Java Memory Management

Java Memory Management

Java memory management is to a large extent automatic.

Automatic memory management was one of the key features of Java when it was first introduced.

As your program runs, the JVM automatically allocates and de-allocates memory for variable, objects, methods and other data structures.

The deallocation process is known as garbage collection (GC), and the process responsible for it is called the garbage collector.

Java GC helps us to avoid memory leaks and optimize memory usage.

However, some memory leaks can still occur due to programming errors. (By not releasing references to objects that are no longer needed.)

Java Memory Layout

There are several types of memory spaces in Java, each serving a different purpose.
+---------------------------------------------+
|                  JVM Memory                 |
+---------------------------------------------+
|           Method Area (MetaSpace)           | ← Stores class metadata, method info, static variables
|                                             |   - Loaded class definitions
|                                             |   - Method and field descriptors
|                                             |   - Runtime constant pool
|                                             |   - Shared across all threads
+---------------------------------------------+
|                   Heap                      | ← Stores all class instances and arrays
|                                             |   - Object fields and values
|                                             |   - Managed by Garbage Collector
|                                             |   - Shared across all threads
+---------------------------------------------+
|             Stack (one per thread)          | ← Stores frames for active method calls
|                                             |   - Method arguments and return values
|                                             |   - Local variables and references
|                                             |   - Each thread has its own stack
+---------------------------------------------+
|          Native Method Stack                | ← Used by native (non-Java) methods
|                                             |   - Platform-specific, unmanaged by JVM GC
+---------------------------------------------+
|         Program Counter (PC) Register       | ← Tracks JVM bytecode instruction per thread
|                                             |   - One per thread
|                                             |   - Points to the current instruction
+---------------------------------------------+
					

MetaSpace

MetaSpace is a memory area in the JVM that stores class metadata. It is used to store information about classes, methods, and fields.

MetaSpace memory is allocated when:

  • Classes are loaded by the JVM when they are referenced in the code.
  • Methods are compiled by the JVM when they are called for the first time.
  • Static variables are initialized when the class is loaded.
  • ...

MetaSpace is garbage collected just like the heap memory.

The size of MetaSpace can be controlled using the following JVM options:

  • -XX:MetaspaceSize - sets the initial size of MetaSpace
  • -XX:MaxMetaspaceSize - sets the maximum size of MetaSpace

MetaSpace is a replacement for the Permanent Generation (PermGen) space in older JVM versions. It is allocated in native memory, which means it is not limited by the heap size.

Heap

Heap memory is where all objects (instances of classes) are stored.
Each time an object is created, memory is allocated on the heap at runtime.

Heap memory is shared among all threads, and it is the memory area is maintained by the garbage collector.

Heap memory is divided into two main areas:

  • Young Generation - where new objects are created
  • Old Generation - where long-lived objects are stored

Heap memory is used for dynamic memory allocation, which means that the size of the heap can grow and shrink during the execution of the program.

Heap memory is managed by the garbage collector, which automatically reclaims memory that is no longer needed.

You can control the size of the heap using following options:

  • -Xms sets the initial size of the heap
  • -Xmx sets the maximum size of the heap

Stack

Stack memory is used to store:

  • Method frames (call stack)
  • Local variables and parameters of methods
  • Primitive values (int, float, etc.)
  • References to objects in heap space

Stack memory is allocated when a method is called and deallocated when the method returns.

Each thread in Java has its own JVM stack which is created at the same time as the thread.

Stack memory has a specific size and is not directly controlled by the programmer.

Properties of stack are:

  • Fast access
  • Automatically managed (cleared when function ends)
  • Exists per thread
  • Cannot grow too big (overflow → StackOverflowError)

Method frame ...

						
							fun sum(a: Int, b: Int): Int {
								val result = a + b
								return result
							}
						
					
Method Frame (for sum):
+-----------------------+
| return address        |
| a = 3                 |
| b = 4                 |
| result = 7            |
| operand stack         |
+-----------------------+
					

Difference Between Stack and Heap

Stack
  • Stores method frames, local variables, and primitive values
  • Memory is allocated and deallocated automatically
  • Fast access, but limited size
  • Each thread has its own stack
  • Used for method calls and local variables
Heap
  • Stores objects and arrays
  • Memory is managed by the garbage collector
  • Slower access, but larger size
  • Shared among all threads
  • Used for dynamic memory allocation

Memory Allocation Examples

1. Primitive Types → Stack (when local)
								
									fun main() {
										val a = 42      // Int – primitive value, allocated on stack
										val b = true    // Boolean – also on stack
									}
								
							
  • These variables are local to the main function → stored in the stack frame.
  • No object instantiation → no heap allocation.
2. Reference Types (Classes) → Heap
								
									class User(val name: String)

									fun main() {
										val user = User("Alice") // user is a reference on stack, actual User object on heap
									}
								
							
  • List reference → stack
  • Actual List and its contents → heap
  • Returned to caller → remains alive after function ends
3. Function-local Objects → Heap
								
									fun createList(): List<Int> {
										val list = listOf(1, 2, 3) // list is a reference on stack, actual List on heap
										return list
									}
								
							
    list reference → stack • actual List and its contents → heap • returned to caller → remains alive after function ends
4. Objects Captured in Lambdas → Heap
								
									fun main() {
										val prefix = "Item " // captured in lambda → stored on heap
										val printer = { i: Int -> println(prefix + i) }

										printer(5)
									}
								
							
  • Lambdas that capture variables allocate a closure object on the heap.
  • If prefix were not captured, lambda could be compiled more efficiently.
5. Top-Level / object Singleton → Heap (once)
								
									object Logger {
										fun log(msg: String) {
											println(msg)
										}
									}
								
							
  • The Logger object is allocated once and lives in heap/meta space.
  • Static-like structure with JVM guarantees.
6. Array Allocation → Heap
								
									val nums = IntArray(5) // array is always allocated on heap
								
							
  • Arrays, even of primitives, live on heap.
  • Elements of primitive type (Int) are stored inline; objects would be references.
7. Inline Functions and Reified Types → Reduced Allocation
								
									inline fun <reified T> printType() {
										println(T::class.java.name)
									}
								
							
  • Inlined at call site → may reduce allocation.
  • No closure or lambda object if no capturing → no heap cost.

Memory Allocation Examples

Here are some examples of what to avoid when allocating memory.

Example: Memory Leak Due to Static Reference

						
							object Leaky {
								var cache: MutableList<Any> = mutableListOf()
							}

							fun main() {
								repeat(1_000_000) {
									Leaky.cache.add(ByteArray(1024 * 1024)) // 1MB
								}
							}
						
					

Garbage Collection

Garbage Collection (GC) is a process of automatically reclaiming memory.

The Garbage Collector automatically frees up heap space memory allocations that are no longer referenced by any running part of the program.

The process of GC is not predictable, and the programmer can’t force garbage collection. System.gc() can be called as a hint to JVM for garbage collection, but it is not guaranteed that it will be performed.

To make the garbage collection process more efficient, the heap is divided into generations.

  • Young Generation (Eden)
  • Old Generation (Tenured)
  • Meta Space

Java uses a generational garbage collection strategy that categorizes objects by age. This is because performing GC on the entire heap would be inefficient. Most objects in Java are short-lived; therefore, there can be more frequent GC events for those.

Garbage Collection Generations

  • Young Generation (Eden)
    This is where all new objects are created. It can be further divided into Eden space and Survivor spaces (FromSpace and ToSpace).
    • When it fills up, a Minor GC event occurs.
    • Objects that are evaluated as dead or alive.
    • Dead objects are removed, and the memory is compacted.
    • If an object survives for a given number minor GC cycles, it is promoted to the Old Generation.
  • Old Generation (Tenured)
    This contains objects that have survived the garbage collection from the Young Generation.
  • Meta Space
    Stores class metadata and is GC-managed, but separate from the heap and generational GC.
+-------------------------------------------------------------+
|                    Young Generation                         |
| - New objects are created here                              |
| - Frequent GC (Minor GC)                                    |
|                                                             |
| +--------------------+ +----------------+ +---------------+ |
| |      Eden Space    | |  Survivor S0   | | Survivor S1   | |
| |  - All new objects | |  - After GC,   | |  - Used as    | |
| |    allocated here  | |    surviving   | |    swap space | |
| |                    | |    objects move| |    in next    | |
| |  - GC clears this  | |    here        | |    GC round   | |
| +--------------------+ +----------------+ +---------------+ |
|                                                             |
|  ↳ Minor GC: clears Eden, moves surviving objects ↔ S0/S1   |
|  ↳ After several GC cycles, surviving objects promoted ↓    |
+-----------------------------+-------------------------------+
                              |
                              v
+-------------------------------------------------------------+
|                   Old Generation (Tenured)                  |
| - Long-living objects reside here                           |
| - Infrequent GC (Major GC or Full GC)                       |
| - Larger memory footprint                                   |
|                                                             |
| ↳ Promotion happens when object survives multiple GC        |
| ↳ GC here is slower and causes application pauses           |
+-----------------------------+-------------------------------+
                              |
                              v
+-------------------------------------------------------------+
|             MetaSpace (was: Permanent Generation)           |
| - Stores class metadata and method information              |
| - Classloader info, static fields, etc.                     |
| - Not part of the heap (since Java 8)                       |
|                                                             |
| ↳ Still GC-managed, but independently of heap objects       |
+-------------------------------------------------------------+
  							

Garbage Collection Strategies

There is a number of GC strategies that can be used in Java. Each strategy has its own advantages and disadvantages, and is suitable for different types of applications.
  • Serial Collector -XX:+UseSerialGC
  • Parallel Collector -XX:+UseParallelGC and -XX:+UseParallelOldGC
  • Concurrent Mark Sweep (CMS) Collector -XX:+UseConcMarkSweepGC
  • G1 (Garbage-First) Collector -XX:+UseG1GC
  • Shenandoah GC -XX:+UseShenandoahGC

Each JVM implementation may implement GC differently, and may have its own GC strategies, although they should all follow the JVM specification.

Multithreading

Multithreading

Multithreading allows execution of multiple parts of a program concurrently, using lightweight processes called threads. It aims to maximize the use of CPU time.

Generally, there is always at least one thread running in a Java program - the main thread.

To create a new thread, need to create new instance Thread class. To start a thread, need to call its start() method of the Thread class instance.

Another way to create a thread is by implementing the Runnable interface and passing an instance of it to a new thread.

Alternatively, we can use the Executor Framework provided by java.util.concurrent.

Synchronization in Java / Kotlin is an important feature that allows only one thread to have access to the shared resource.

Thread

The Java way.

Thread> is a class in Java that allows you to create and manage threads. You can create thread directly, or by extending the Thread class and overriding the run()> method.

						
							fun main() {

								val thread = Thread {
									println("Hello from '${Thread.currentThread().name}' thread")
								}

								thread.start()

								println("Hello from '" + Thread.currentThread().name + "' thread")
							}
						
					
						
							fun main() {
								MyThread().start()
							}

							class MyThread : Thread() {
								override fun run() {
									println("Hello from '${currentThread().name}' thread");
								}
							}
						
					

Thread

Thread is started by calling the start() method. When the start() method is called, the JVM calls the run() method of the thread.

When the run() method finishes, the thread is considered to be terminated.

If at any time you want to stop a thread, you can call the interrupt() method.

To wait for a thread to finish, you can call the join() method. However, beware that this will block the current thread until the other thread finishes.

Thread

The Kotlin way

While we use Java Thread class in Kotlin, as usual, Kotlin provides a more concise way to create and manage threads.

You can use the thread function to create a new thread.

The function simplifies the creation of a new thread by allowing you to configure the thread properties in a more concise way. The arguments available are:

  • start: Boolean = true - start the thread immediately
  • isDaemon: Boolean = false - create a daemon thread
  • contextClassLoader: ClassLoader? = null, - class loader to use for loading classes and resources
  • name: String? = null, - name of the thread
  • priority: Int = -1, - priority of the thread
  • block: () -> Unit - code to be executed in the thread
						
							fun main() {
								thread(start = true, isDaemon = true, name = "my-thread", priority = 999) {
									println("Hello from '${Thread.currentThread().name}' thread")
								}

								println("Hello from '" + Thread.currentThread().name + "' thread")
							}
						
					

Runnable

Runnable is an interface in Java that allows you to create and manage threads.

To use Runnable, you pass an instance of Runnable to a new Thread.

						
							class MyRunnable : Runnable {
								override fun run() {
									println("Hello from '${Thread.currentThread().name}' thread")
								}
							}
						
					
						
							fun main() {
								val thread = Thread(MyRunnable(), "Runner 1")

								println("Starting thread '${thread.name}'")

								thread.start()

								try {
									// this will block the main thread until the other thread finishes
									thread.join()
								} catch (e: InterruptedException) {
									Thread.currentThread().interrupt()
									throw RuntimeException(e)
								}

								println("Thread '${thread.name}' finished")
							}
						
					

Memory Synchronization

Memory synchronization ensures that the changes made by one thread to the shared data are visible to other threads.
  • @Volatile

    Used to mark a field as volatile to the JVM. It ensures that all reads of a volatile variable are read directly from main memory, and all writes to a volatile variable are written directly to main memory. By itself, volatile does not provide atomicity, but it ensures visibility.

    								
    									@Volatile
    									private var flag: Boolean = true
    								
    							
  • @Synchronized

    If method is synchronized, only one thread can access the method or block at a time. This ensures that the changes made by one thread to the shared data are visible to other threads.

    								
    									@Synchronized
    									fun someMethod() {
    										// ...
    									}
    								
    							
  • synchronized block

    You can also use synchronized block to synchronize access to shared data within a block of code.

    The difference between @Synchronized and synchronized block is that the former is used to synchronize a method, while the latter is used to synchronize a block of code. Synchronizing on method level can be more efficient than synchronizing on block level, especially if the code is called frequently, because synchronized needs to acquire and release the lock every time the method is called.

Memory Synchronization

Remember that incorrect synchronization can lead to issues like race conditions, deadlocks or even data inconsistency. It advised to avoid shared mutable data for threads and use thread confinement or immutability.

Memory Synchronization

No synchronization - NOT thread safe!
						
							var sharedCounter = 0

							fun main() {

								val thread1 = Thread(::incrementCounter)
								val thread2 = Thread(::incrementCounter)

								thread1.start()
								thread2.start()

								// wait for both threads to finish
								thread1.join()
								thread2.join()

								println("Final Counter Value: $sharedCounter")
							}

							fun incrementCounter() {
								repeat(1000) {
									sharedCounter++
								}
							}
						
					

Memory Synchronization

With synchronization using @Synchronized - thread safe.
						
							@Volatile // doesn't ensure atomicity, but ensures visibility
							var sharedCounter = 0

							fun main() {

								val thread1 = thread { incrementCounter() }
								val thread2 = thread { incrementCounter() }

								thread1.start()
								thread2.start()

								// wait for both threads to finish
								thread1.join()
								thread2.join()

								println("Final Counter Value: $sharedCounter")
							}

							@Synchronized // ensures that only one thread can execute this function at a time, acquire lock on whole function
							fun incrementCounter() {
								repeat(1000) {
									sharedCounter++
								}
							}
						
					

Memory Synchronization

With synchronization with synchronized function - thread safe.
						
							@Volatile // doesn't ensure atomicity, but ensures visibility
							var sharedCounter = 0

							fun main() {

								val thread1 = thread { incrementCounter() }
								val thread2 = thread { incrementCounter() }

									thread1.start()
									thread2.start()

									// wait for both threads to finish
									thread1.join()
									thread2.join()

								println("Final Counter Value: $sharedCounter")
							}

							private val lock = Any() // because incrementCounter is a top-level function, we need and container object to lock on

							fun incrementCounter() {
								repeat(1000) {
									synchronized(lock) { // ensures atomicity, acquire lock on this object
										sharedCounter++
									}
								}
							}
						
					

java.util.concurrent

Besides the low-level synchronization mechanisms such as volatile and synchronized keywords, Java provides a number of classes and interfaces in the java.util.concurrent package to help with multithreading.

The package includes:

  • Atomic Variables
  • Synchronizers
  • Concurrent Collections
  • Locks
  • Callable and Future
  • Executor Framework

Atomic Variables

The java.util.concurrent package defines classes that support atomic operations on single variables. All atomic operations are thread-safe.

There are several classes in this package, for example AtomicBoolean, AtomicInteger, AtomicLong, etc.

Here is what you can do with atomic variables ...

Atomic Variables

Atomic Read and Write
You can read or write the value of atomic variables in a thread-safe manner. When you update an atomic variable, it ensures that the new value is immediately visible to other threads.

						
							val atomicInteger = AtomicInteger(0)
							atomicInteger.set(78)
							val value = atomicInteger.get()
						
					

Atomic Update
This allows you to atomically update the value of atomic variables. For Atomic integers and longs, it includes methods to increment, decrement, and add a certain value atomically.

						
							val atomicInteger = AtomicInteger(0)
							atomicInteger.incrementAndGet()
							atomicInteger.addAndGet(46)
						
					

Atomic Variables

Compare and Set/Swap (CAS)
It enables you to update the value of a variable only when it has a certain expected value. It’s a way of managing concurrency, without traditional lock-based synchronization. For example, to atomically update a value only if it's currently equal to 10, you can use:

						
							val atomicInteger = AtomicInteger(10)
							val updated = atomicInteger.compareAndSet(10, 78)
						
					

getAndIncrement, getAndDecrement, getAndAdd
These are atomic operations that atomically increment, decrement, or add the value and return the old value.

						
							val atomicInteger = AtomicInteger(0)
							val oldValue = atomicInteger.getAndIncrement()
						
					

Atomic Variables

						
							import java.util.concurrent.atomic.AtomicInteger
							import kotlin.concurrent.thread

							private val counter = AtomicInteger(10)

							fun main() {

								thread(name = "thread-1") {
									while (counter.getAndDecrement() > 0) {
										println("Hello from '" + Thread.currentThread().name + "' thread. Counter = ${counter.get()}");
									}
								}

								thread(name = "thread-2") {
									while (counter.getAndDecrement() > 0) {
										println("Hello from '" + Thread.currentThread().name + "' thread. Counter = ${counter.get()}");
									}
								}
							}
						
					
						
							Hello from 'thread-1' thread. Counter = 9
							Hello from 'thread-1' thread. Counter = 7
							Hello from 'thread-1' thread. Counter = 6
							Hello from 'thread-2' thread. Counter = 8
							Hello from 'thread-1' thread. Counter = 5
							Hello from 'thread-1' thread. Counter = 3
							Hello from 'thread-1' thread. Counter = 2
							Hello from 'thread-1' thread. Counter = 1
							Hello from 'thread-1' thread. Counter = 0
							Hello from 'thread-2' thread. Counter = 4
						
					

Synchronizers

  • Semaphore
    It controls the access to a shared resource through the use of a counter. If the counter is greater than zero, the access is allowed, otherwise the access is denied. This is often used to limit the number of threads that can access a particular resource.
  • CountDownLatch
    It allows one or more threads to wait until a set of operations being performed in other threads completes. Once the count is zero, all waiting threads proceed. It's a one-time phenomenon, once the latch reaches zero it cannot be reset.
  • CyclicBarrier
    It's used when multiple threads carry out different sub tasks and the output of these sub tasks need to be combined to form the final output. It's called cyclic because it can be reused after waiting threads are released.
  • Phaser
    It's more flexible than both CountDownLatch and CyclicBarrier. It's called Phaser because it phases all the threads into stages of execution.
  • Exchanger
    It's used to exchange data between two threads. It waits for both the threads to reach the exchange point. If the threads do not appear simultaneously to exchange their objects, they'll be paused until the arrival of the other thread.

Semaphore

						
							import java.util.concurrent.Semaphore
							import kotlin.concurrent.thread

							private val semaphore = Semaphore(1)

							fun main() {
								thread(name = "A") { execute(semaphore) }
								thread(name = "B") { execute(semaphore) }
							}

							fun execute(semaphore: Semaphore) {
								try {
									semaphore.acquire()
									println("Thread '" + Thread.currentThread().name + "' acquired the semaphore")
								} catch (e: InterruptedException) {
									Thread.currentThread().interrupt()
									throw RuntimeException(e)
								} finally {
									println("Thread '" + Thread.currentThread().name + "' is releasing the semaphore")
									semaphore.release()
								}
							}
						
					
						
							Thread 'A' acquired the semaphore
							Thread 'A' is releasing the semaphore
							Thread 'B' acquired the semaphore
							Thread 'B' is releasing the semaphore
						
					

CountDownLatch

						
							fun main() {

								thread(name = "WAITING") {
									println("Thread '${Thread.currentThread().name}' started")
									try {
										latch.await()
									} catch (e: InterruptedException) {
										Thread.currentThread().interrupt()
										throw RuntimeException(e)
									}
									println("Thread '${Thread.currentThread().name}' finished")
								}

								thread(name = "COUNTING") {
									println("Thread '${Thread.currentThread().name}' started")
									while (latch.count > 0) {
										println("Thread '${Thread.currentThread().name}' counting down ${latch.count}...")
										latch.countDown()
									}
									println("Thread '${Thread.currentThread().name}' finished")
								}
							}
						
					
						
							Thread 'COUNTING' started
							Thread 'WAITING' started
							Thread 'COUNTING' counting down 3...
							Thread 'COUNTING' counting down 2...
							Thread 'COUNTING' counting down 1...
							Thread 'COUNTING' finished
							Thread 'WAITING' finished
						
					

CyclicBarrier

						
							import java.util.concurrent.CyclicBarrier
							import kotlin.concurrent.thread

							private var barrier = CyclicBarrier(3) { println("Barrier reached") }

							fun main() {
								thread(name = "A") { execute(barrier) }
								thread(name = "B") { execute(barrier) }
								thread(name = "C") { execute(barrier) }
							}

							fun execute(barrier: CyclicBarrier) {
								try {
									println("Thread '${Thread.currentThread().name}' is waiting on the barrier")
									barrier.await()
									println("Thread '${Thread.currentThread().name}' has passed the barrier")
								} catch (e: Exception) {
									throw RuntimeException(e)
								}
							}
						
					
						
							Thread 'C' is waiting on the barrier
							Thread 'B' is waiting on the barrier
							Thread 'A' is waiting on the barrier
							Barrier reached
							Thread 'A' has passed the barrier
							Thread 'C' has passed the barrier
							Thread 'B' has passed the barrier
						
					

Phaser

						
							import java.util.concurrent.Phaser
							import kotlin.concurrent.thread

							private val phaser = Phaser(2)

							fun main() {
								thread(name = "PRE-PROCESSOR") { preProcessor(phaser) }
								thread(name = "POST-PROCESSOR") { postProcessor(phaser) }
							}

							fun postProcessor(phaser: Phaser) {
								println("Thread '${Thread.currentThread().name}' has arrived. Waiting for others...")
								phaser.arriveAndAwaitAdvance()
								println("Thread '${Thread.currentThread().name}' has finished.")
							}

							fun preProcessor(phaser: Phaser) {
								try {
									Thread.sleep(1000)
								} catch (e: InterruptedException) {
									Thread.currentThread().interrupt()
									throw RuntimeException(e)
								}
								println("Thread '${Thread.currentThread().name}' has arrived.")
								phaser.arriveAndDeregister()
								println("Thread '${Thread.currentThread().name}' has finished.")
							}
						
					
						
							Thread 'POST-PROCESSOR' has arrived. Waiting for others...
							Thread 'PRE-PROCESSOR' has arrived.
							Thread 'PRE-PROCESSOR' has finished.
							Thread 'POST-PROCESSOR' has finished.
						
					

Exchanger

						
							import java.util.concurrent.Exchanger
							import kotlin.concurrent.thread

							private val exchanger = Exchanger<String>()

							fun main() {
								thread(name = "A") { exchange(exchanger) }
								thread(name = "B") { exchange(exchanger) }
							}

							fun exchange(exchanger: Exchanger<String>) {
								try {
									val message = exchanger.exchange("Hello from ${Thread.currentThread().name}")
									println("Thread '${Thread.currentThread().name}' received message: $message")
								} catch (e: InterruptedException) {
									Thread.currentThread().interrupt()
									throw RuntimeException(e)
								}
							}
						
					
						
							Thread 'A' received message: Hello from B
							Thread 'B' received message: Hello from A
						
					

java.util.concurrent

Concurrent Collections
This includes thread-safe collection classes used in place of synchronized wrappers such as Hashtable or Collections.synchronizedMap(Map).

Locks
More advanced and flexible locking mechanism compared to intrinsic locking.

Callable and Future
Callable tasks are similar to Runnable tasks but they can return a result and are capable of throwing checked exceptions. Futures represent result of an asynchronous computation - a way to handle the results of callable tasks.

Executor Framework
This is a higher-level replacement for working with threads directly. Executors are capable of managing a pool of threads, so we do not need to manually create new threads and run tasks in an asynchronous fashion.

Future example

Again, Future is one of the objects you may encounter when working with Java libraries in Kotlin.

						
							import java.util.concurrent.Executors
							import java.util.concurrent.Future

							fun main() {
								val messenger = Messenger()
								val message: Future<String> = messenger.receiveMessage()

								while (!message.isDone) {
									println("Waiting for message...")
									try {
										Thread.sleep(500)
									} catch (e: InterruptedException) {
										Thread.currentThread().interrupt()
										throw RuntimeException(e)
									}
								}

								try {
									println("Received message: ${message.get()}")
								} catch (e: Exception) {
									throw RuntimeException(e)
								}
							}

							class Messenger {
								private val executor = Executors.newSingleThreadExecutor()

								fun receiveMessage(): Future<String> {
									return executor.submit<String> {
										Thread.sleep(3000)
										"Hello from future!"
									}
								}
							}
						
					
						
							Waiting for message...
							Waiting for message...
							Waiting for message...
							Waiting for message...
							Waiting for message...
							Waiting for message...
							Received message: Hello from future!
						
					

Coroutines

Coroutines

Coroutines are light-weight alternative to threads, which are used for asynchronous programming.

A coroutine is an instance of a suspendable computation. It allows for asynchronous code executions, like threads, but they are not bound to any particular thread. A coroutine may start executing in one thread, suspend its execution and resume in another one. This makes coroutines more efficient than threads - by design, they are non-blocking and reuse threads.

Coroutines run in a context of a CoroutineScope, which defines the lifecycle of the coroutine. When the scope is cancelled, all coroutines started in that scope are cancelled. CoroutineScope can only finish when all its inner coroutines are finished.

Coroutines are one of the main features of Kotlin, and while working with them is straightforward, I believe it is a topic for Kotlin Advanced course.

Coroutines Basics

There are several principles to work with coroutines.

Let's explain with example:

						
							fun main() = runBlocking {

								launch {
									delay(1000)
									println("Kotlin Coroutines World!")
								}

								println("Hello")

							}
						
					
  • launch - is a coroutine builder. It launches a new coroutine concurrently with the rest of the code, which continues to work independently.
  • delay - suspends the coroutine for a specific time. Suspending a coroutine does not block the underlying thread, but allows other coroutines to run and use the underlying thread for their code.
  • runBlocking - is also a coroutine builder that bridges the non-coroutine world of a regular fun main() and the code with coroutines inside of runBlocking { ... } curly braces.

In this example, Hello is printed first, because the coroutine is suspended for 1 second.

Coroutine Scope

Coroutine scope is a way to manage the lifecycle of coroutines.

Coroutines follow a principle of structured concurrency which means that new coroutines can only be launched in a specific coroutine scope which delimits the lifetime of the coroutine.

  • It ensures that all coroutines are properly managed and not leaking outside of their scope.
  • It also ensures that any errors in the code are properly reported and are never lost.

Besides coroutine scopes provided by builders such as runBlocking and launch, you can also create your own coroutine scope using coroutineScope function.

								
									fun main() = runBlocking {
										launch {
											task()
										}
										println("Hello")
									}
								
							
								
									suspend fun task() = coroutineScope {
										delay(1000)
										println("Kotlin Coroutines World!")
									}
								
							
  • coroutineScope - a coroutine builder that creates a new coroutine scope.
    • It allows you to launch new coroutines in the scope of the current coroutine.
    • An outer scope cannot complete until all its children coroutines complete.
  • suspend - a modifier that marks a function as a suspend function.

Suspendable Functions

suspend functions are the building blocks of coroutines.

suspend functions ...

  • can execute long-running operations without blocking the thread
  • can be paused (suspended) and resumed at a later time, without blocking the underlying thread
  • can only be called from other suspend functions or from coroutines scope.
  • suspended function may resume their operation in a different thread

Calling any suspend function from a non-suspend function, you will get a compilation error. If you need to cross the asynchronous to synchronous boundary, you can use runBlocking function.

If you call multiple suspend functions like this, they will run in parallel.

						
							fun main() = runBlocking {

								val time = measureTimeMillis {
									val result1 = task1()
									val result2 = task2()
								}

								println("Milliseconds: $time")
							}
						
					

Suspendable Functions

Parallel execution of suspend functions.

If you call multiple suspend functions, by default they will run in sequentionally.

						
							fun main() = runBlocking {

								val time = measureTimeMillis {
									val result1 = task1()
									val result2 = task2()
									println("$result1 $result2")
								}
								println("It took $time ms")
							}

							suspend fun task1(): String {
								delay(1000)
								return "Hello"
							}

							suspend fun task2(): String {
								delay(2000)
								return "Coroutines"
							}
						
					
						
							Hello Coroutines
							It took 3037 ms
						
					

Suspendable Functions

Parallel execution of suspend functions.

To allow these functions to run in parallel, you can use async function with await function to get the result.

					
						fun main() = runBlocking {

							val time = measureTimeMillis {
								val result1 = async { task1() }
								val result2 = async { task2() }
								println("Result: ${result1.await() + result2.await()}")
							}

							println("It took $time ms")
						}

						suspend fun task1(): String {
							delay(1000)
							return "Hello"
						}

						suspend fun task2(): String {
							delay(2000)
							return "Coroutines"
						}
					
					
						
							Result: HelloCoroutines
							It took 2044 ms
						
					

Coroutine Dispatchers

Coroutines are dispatched onto different threads using Dispatchers.

The coroutine context includes a CoroutineDispatcher that determines what thread or threads the corresponding coroutine uses for its execution. The coroutine dispatcher can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined.

There are some predefined dispatcher types available for specific tasks in Kotlin:

  • Dispatchers.Unconfined - starts in the context of the current thread, but it can be resumed in a different thread.
  • Dispatchers.Default - uses a shared background pool of threads.
  • Dispatchers.IO - uses a shared pool of threads for I/O-bound tasks.

There is also a newSingleThreadContext function that creates a dispatcher with a single thread.

						
							launch { // context of the parent, main runBlocking coroutine
								println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
							}
							launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
								println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
							}
							launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
								println("Default               : I'm working in thread ${Thread.currentThread().name}")
							}
							launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
								println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
							}
						
					
						
							Unconfined            : I'm working in thread main
							Default               : I'm working in thread DefaultDispatcher-worker-1
							newSingleThreadContext: I'm working in thread MyOwnThread
							main runBlocking      : I'm working in thread main
						
					
This example is taken directly from Kotlin documentation

Shared mutable state and concurrency

You can use the same principles for sharing state between coroutines as you would with threads.

There are some additional mechanisms to handle shared mutable state in coroutines provided by Kotlin such as ...

  • Thread-safe data structures - such as AtomicInteger, Semaphore, etc.
  • Thread confinement - a way to ensure that a specific piece of code runs only in a specific thread.
  • Mutex - a mutual exclusion lock that allows only one coroutine to access a shared resource at a time.

Let's create an example fo a function that launches 100 coroutines and each coroutine repeats an action 1000 times ...

						
							suspend fun massiveRun(action: suspend () -> Unit) {
								val n = 100  // number of coroutines to launch
								val k = 1000 // times an action is repeated by each coroutine
								val time = measureTimeMillis {
									coroutineScope { // scope for coroutines
										repeat(n) {
											launch {
												repeat(k) { action() }
											}
										}
									}
								}
								println("Completed ${n * k} actions in $time ms")
							}
						
					

Shared mutable state and concurrency

Thread-safe data structures

This isn't much different from working with threads. We control access to shared mutable state using thread-safe data structures such as AtomicInteger.

						
							val counter = AtomicInteger()

							fun main() = runBlocking {
								withContext(Dispatchers.Default) {
									massiveRun {
										counter.incrementAndGet()
									}
								}
								println("Counter = $counter")
							}
						
					
This example is taken directly from Kotlin documentation

Shared mutable state and concurrency

Thread confinement

Thread confinement is an approach to the problem of shared mutable state where all access to the particular shared state is confined to a single thread.

B) With each increment confine to a single-threaded context ...

						
							val counterContext = newSingleThreadContext("CounterContext")
							var counter = 0

							fun main() = runBlocking {
								withContext(Dispatchers.Default) {
									massiveRun {
										// confine each increment to a single-threaded context
										withContext(counterContext) {
											counter++
										}
									}
								}
								println("Counter = $counter")
							}
						
					

B) With everything confined to a single-threaded context ...

						
							val counterContext = newSingleThreadContext("CounterContext")
							var counter = 0

							fun main() = runBlocking {
								// confine everything to a single-threaded context
								withContext(counterContext) {
									massiveRun {
										counter++
									}
								}
								println("Counter = $counter")
							}
						
					
This example is taken directly from Kotlin documentation

Shared mutable state and concurrency

Mutex

Mutex is a Kotlin mutual exclusion lock that allows only one coroutine to access a shared resource at a time.

  • It is a suspending function, so it does not block the underlying thread.
  • It has a lock and unlock functions to delimit a critical section.
  • You can also use withLock extension function to represent lock(); try { ... } finally { unlock() } pattern.
						
							val mutex = Mutex()
							var counter = 0

							fun main() = runBlocking {
								withContext(Dispatchers.Default) {
									massiveRun {
										// protect each increment with lock
										mutex.withLock {
											counter++
										}
									}
								}
								println("Counter = $counter")
							}
						
					
This example is taken directly from Kotlin documentation

Coroutine and Channels

Channels are a way to communicate between coroutines.

Kotlin's coroutine channels are a mechanism for communication between coroutines. They allow multiple coroutines to send and receive data, much like blocking queues, but in a non-blocking way. Channels are especially useful in scenarios where coroutines need to share or process streams of data concurrently.

Basic channel operations

  • send(value): Sends a value to the channel. This suspends the sender if the channel is full.
  • receive(): Receives a value from the channel. This suspends the receiver if the channel is empty.
  • close(): Closes the channel, signaling that no more values will be sent.
						
							fun main() = runBlocking {
								val channel = Channel<Int>() // Create a channel of type Int

								// Launch a coroutine to send values
								launch {
									for (i in 1..5) {
										channel.send(i)  // Send values to the channel
										println("Sent: $i")
									}
									channel.close()  // Close the channel after sending all values
								}

								// Receive values from the channel
								for (value in channel) {
									println("Received: $value")
								}
							}
						
					

There, the sender coroutine sends values to the channel, and the receiver iterates over the channel to receive values until the channel is closed.

Once a channel is closed, no more values can be sent, and the for loop over the channel will terminate gracefully.

Buffered Channels

By default, channels are unbuffered, meaning the sender and receiver must synchronize before data is passed. Buffered channels allow the sender to send multiple items before the receiver has to process them.

					
						val channel = Channel<Int>(capacity = 3) // A buffered channel with capacity 3
					
				
					
						fun main() = runBlocking {
							val channel = Channel<Int>(3) // Buffer size is 3

							launch {
								repeat(5) {
									channel.send(it)
									println("Sent: $it")
								}
								channel.close()
							}

							for (value in channel) {
								println("Received: $value")
							}
						}
					
				

Conflated Channels

Conflated channels retain only the latest sent value and discard previous ones if not yet received. This is useful for scenarios like UI updates, where only the latest state matters.

						
							val conflatedChannel = Channel<Int>(Channel.CONFLATED)
						
					
						
							fun main() = runBlocking {
								val channel = Channel<Int>(Channel.CONFLATED)

								launch {
									channel.send(1)
									channel.send(2)  // The first value is overwritten before it's received
									channel.send(3)
									channel.close()
								}

								println("Received: ${channel.receive()}")  // Only the latest value (3) is received
							}
						
					

Use Cases of Channels

  • Producer-Consumer

    Channels are often used to implement pipelines, where data flows through multiple stages, each represented by a coroutine.

  • Streaming Data

    Channels are useful for processing real-time data feeds and event streams.

  • UI Updates

    Channels can be used to propagate the latest state to UI components.

  • Work Distribution

    Channels can be used to split tasks between multiple workers in a coroutine system.

Coroutine Cancellation, Timeouts and Exceptions

Coroutine Cancellation

Handling cancellation, timeouts, and exceptions properly is crucial for building reliable coroutine-based applications. Let’s break down how Kotlin coroutines handle each scenario.

Coroutines in Kotlin are cooperative, meaning they need to actively check for cancellation and respond appropriately.

Coroutines can be cancelled by calling the cancel() function on a coroutine’s Job.

						
							fun main() = runBlocking {
								try {
									repeat(1000) { i ->
										println("Job: $i")
										delay(500)  // Suspending function that checks for cancellation
									}
								} finally {
									println("Cleanup")
								}

								delay(2000)  // Let the coroutine run for 2 seconds
								println("Cancelling the job")
								job.cancel()  // Cancel the coroutine
								job.join()    // Wait for the coroutine to complete
								println("Job cancelled")
							}
						
					

The coroutine checks for cancellation during the delay(500) call. Without cooperative functions (e.g., delay, yield), the coroutine won’t be cancelled unless you manually handle it.

When a coroutine is cancelled, any cleanup operations (e.g., closing resources) should be performed in the finally block.

Coroutine Cancellation, Timeouts and Exceptions

Timeouts

withTimeout allows you to automatically cancel a coroutine if it takes longer than a specified time.

TimeoutCancellationException is thrown if the timeout is reached.

						
							fun main() = runBlocking {
								try {
									withTimeout(1500) {  // Timeout after 1.5 seconds
										repeat(1000) { i ->
											println("Task: $i")
											delay(500)
										}
									}
								} catch (e: TimeoutCancellationException) {
									println("Coroutine timed out: ${e.message}")
								}
							}
						
					

If you want to handle timeouts without throwing an exception, use withTimeoutOrNull , which returns null if the timeout is exceeded.

Coroutine Cancellation, Timeouts and Exceptions

Handling Exceptions in Coroutines

Exceptions in coroutines are handled differently depending on the coroutine builder (launch, async, etc.):

  • launch: Exceptions are propagated to the parent and cause the coroutine to cancel.
  • async: Exceptions are not thrown immediately but are propagated when calling await().

Coroutine Cancellation, Timeouts and Exceptions

Supervisor Jobs and Exception Propagation

SupervisorJob allows you to isolate the failure of one child coroutine from affecting other coroutines in the same scope. In a SupervisorJob, exceptions from one coroutine do not cancel sibling coroutines.

						
							fun main() = runBlocking {
								val supervisor = SupervisorJob()

								val scope = CoroutineScope(coroutineContext + supervisor)

								val job1 = scope.launch {
									println("Job 1 started")
									delay(1000)
									throw Exception("Job 1 failed")
								}

								val job2 = scope.launch {
									println("Job 2 started")
									delay(2000)
									println("Job 2 completed")
								}

								job1.join()
								job2.join()  // Job 2 is not affected by Job 1's failure
							}
						
					
						
							Job 1 started
							Job 2 started
							Exception in thread "main" java.lang.Exception: Job 1 failed
								at MainKt$main$1$job1$1.invokeSuspend(Main.kt:11)
								at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
								...
								at MainKt.main(Main.kt:3)
								at MainKt.main(Main.kt)
								Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@2aafb23c, BlockingEventLoop@2b80d80f]
							Job 2 completed
						
					

Coroutine Best Practices

  1. Always use runBlocking in a limited scope (typically only in main functions or tests).
  2. Prefer CoroutineScope to manage coroutines in more complex applications.
  3. Use withContext(Dispatchers.IO) for blocking I/O operations to prevent UI blocking.
  4. Use structured concurrency principles to manage coroutine lifecycles (avoid orphaned coroutines).

Next Lesson

Next Lesson

Validation, Exceptions and Error Handling