Phone Scrubber
Frontend VS Backend
Most people would jump to making a nextjs app or a sveltekit app. I chose to use HTMX + Django and the results have been pretty good. I only need reactivity for the table updating as the file goes through the process queue and then when finished create a download link for the newly created file. Why have hundreds of node modules for that?
I know that I'm biased towards using the more old-school tech but hear me out. It has a proven track record of being maintainable, stable and scalable without some of the more strange issues you get with node and larger JS frameworks in general. Onward to what was built...
I wanted to make my own Results / Queue interactions and a lot of what is in some libraries has quite a bit of extra features I don't need. I just wanted a simple queue that does the job, nothing extra.
class Result(object):
def __init__(self, ok: bool, message: str):
self.ok = ok
self.message = message
class Node(object):
def __init__(self, item_id: int):
self.item_id = item_id
self.next = None
self.prev = None
def set_prev(self, p=None):
self.prev = p
def set_next(self, n=None):
self.next = n
class Queue(object):
def __init__(self):
self.current = 0
self.head = None
self.tail = None
def has_items(self):
return self.current > 0 and self.head is not None
def enque(self, item_id: int):
self.current += 1
n = Node(item_id)
if self.head is None:
self.head = self.tail = n
return
self.head.set_prev(n)
n.set_next(self.head)
self.head = n
def dequeue(self) -> int:
result = None
if self.current == 0 or self.tail is None:
self.head = self.tail = None
self.current = 0
return result
self.current -= 1
result = self.tail.item_id
if self.tail.prev is not None:
self.tail.prev.next = None
self.tail = self.tail.prev
if self.current == 0 or self.tail is None:
self.head = self.tail = None
self.current = 0
return result
This is about as basic a queue as you can make I think. It is the basis for both the root APIQueue and each user's DocumentQueue (for when they upload files).
Fan-In Files, pipe to throttled API queue, create download file
There you go, the app. But seriously, dictionaries in Python can be used safely in multithreaded situations assuming that each thread has it's own unique key into that dict. Thus I made a top level dictionary with each user id as a key, and each user has a document upload queue inside it.
The UploadQueue is essentially just a dictionary of unique user IDs with their own Document upload queue. So each person can upload a file, it will process independently, then get put into the APIQueue (which HAS to be throttle per the freaking API we are using...) Rememeber, Fan-In -> Hit Throttle API -> Fan-Out.
from .queue import Queue, Result
from .doc_queue import DocumentQueue
from .api_queue import APIQueue
class UserUploadQueue(Queue):
def __init__(self):
super().__init__()
self.map: dict[int, DocumentQueue] = {}
def enque(self, user_id: int, doc_id: int, api_queue: APIQueue):
if user_id not in self.map.keys():
self.map[user_id] = DocumentQueue()
self.map[user_id].enque(doc_id)
return self.reprocess(user_id, api_queue)
def reprocess(self, user_id: int, api_queue: APIQueue) -> Result:
result = Result(ok=False, message="Document unable to be processed.")
doc_queue = self.map[user_id]
if doc_queue and doc_queue.has_items() and not doc_queue.running:
result = doc_queue.process(api_queue)
return result
And you probably want to see the document Queue as well. The DocQueue and the APIQueue have fairly complex processing functions that I am not going to fully show here.
import os
from django.conf import settings
from django.utils import timezone
from .models import PhoneDocument, Record
from .api_queue import APIQueue
from .queue import Queue, Result
class DocumentQueue(Queue):
def __init__(self):
super().__init__()
self.running = False
def process(self, api_queue: APIQueue):
self.running = True
good_docs = 0
bad_docs = 0
while self.has_items():
doc_id = self.dequeue()
# PROCESS RECORDS // HANDLE ERRORS // ETC...
self.running = False
message = ""
if good_docs > 0 and bad_docs == 0:
message = f"[{good_docs}] Doc(s) Processing"
if good_docs == 0 and bad_docs > 0:
message = f"[{bad_docs}] Doc(s) Failed"
if good_docs > 0 and bad_docs > 0:
message = f"[{good_docs}] Doc(s) Processing and [{bad_docs}] Failed"
if good_docs == 0 and bad_docs == 0:
message = "Something went terribly wrong."
return Result(
ok=good_docs > 0,
message=message
)
What really happens is the file gets processed when uploaded, certain info gets persisted to the DB then the ID of the uploaded file get enqueued as to not cause a ruckus.
The API Queue is also Top level and thread safe but there is only one, the API we call is restricted to 10 requests / sec. So it has to be done in a way where this process is being checked and ran if the queue has an ID passed to it. Always in order because even if we all upload a file, we can only parse one phone at a time.
Obviously you will track duplicates. If you have already had an API call for the number, just reuse the data, save the call for a different phone. Otherwise, let it rip.
Background tasks
I tend to prefer the standard library for things. Just using a single background thread that has been daemonized does the trick of handling the API queue and the file upload queues are also good to go as they trigger anytime an upload happens.
This is the main background daemon that I want running at all times. If there is nothing in the APIQueue, it just waits basically.
import threading
from .api_queue import APIQueue
from time import sleep
class BackgroundProcess(threading.Thread):
def __init__(self, api_queue: APIQueue):
super().__init__()
self.api_queue = api_queue
self.daemon = True
self.start()
def run(self):
while True:
sleep(1)
if not self.api_queue.running:
self.api_queue.process()
Also the main APIQueue, I again will not show everything here but you should get the drift.
from django.utils import timezone
import logging
from .models import PhoneDocument, RecordPhoneNumber
from time import sleep
from .queue import Queue, Result
class APIQueue(Queue):
def __init__(self, cooldown: float, api_url: str, api_token: str):
super().__init__()
self.cooldown = cooldown
self.api_url = api_url
self.api_token = api_token
self.running = False
def process(self) -> bool:
self.running = True
while self.has_items():
doc_id = self.dequeue()
if doc_id is not None:
doc = PhoneDocument.objects.get(id=doc_id)
records = doc.records.all()
for record in records:
# PROCESS RECORDS // HANDLE ERRORS // ETC...
doc.save()
logging.info(f"Doc[{doc_id}] successfully processed.")
self.running = False
There can still be zombie-phones in the database though. So to circumvent that there is a cron job on the server that calls every so often to enqueue phones that have been flagged for a queue yet have not been called in the API, thus allowing for future duplicates to get parsed. These phones can only get zombied if something crazy happens, like the API server died somehow or our server went down. I'm sure there are other reasons but my brain can't think of it right now.